Source code for schrodinger.application.matsci.mlearn.features

"""
Classes and functions to deal with ML features.

Copyright Schrodinger, LLC. All rights reserved."""

import argparse
import itertools
import math
import matplotlib
import operator
import os
import random
import shutil
from collections import defaultdict
from collections import namedtuple

import numpy
from matminer.featurizers.site import CrystalNNFingerprint
from matminer.featurizers.structure import SiteStatsFingerprint

from schrodinger import structure
from schrodinger import surface
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.matsci import amorphous
from schrodinger.application.matsci import clusterstruct as cstruct
from schrodinger.application.matsci import jaguar_restart
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msconst
from schrodinger.application.matsci import reaction_workflow_utils as rxnwfu
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci.elementalprops import ElementalProperties
from schrodinger.application.matsci import \
    genetic_optimization as go
from schrodinger.application.matsci.mlearn.base import BaseFeaturizer
from schrodinger.application.matsci.nano import xtal
from schrodinger.application.matsci import rxn_channel
from schrodinger.infra import structure as infrastructure
from schrodinger.job import queue
from schrodinger.structutils import analyze
from schrodinger.structutils import transform
from schrodinger.structutils.measure import measure_bond_angle
from schrodinger.structutils.measure import measure_distance
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess

X_VEC = numpy.array(transform.X_AXIS)
Z_VEC = numpy.array(transform.Z_AXIS)

RESERVED_JAGUAR_KEYWORDS = {'ldips': 5, 'icfit': 1, 'fukui': 1}

MomentData = namedtuple('MomentData', ['flag', 'components', 'header', 'units'])

# yapf: disable
MOMENTS_DATA = [
    MomentData(flag='-dipolecomp', components=('X', 'Y', 'Z'), header='dipole',
        units='Debye'),
    MomentData(flag='-quadrupole', components=('XX', 'YY', 'ZZ', 'XY', 'XZ', 'YZ'),
        header='quadrupole', units='Debye * Ang.'),
    MomentData(flag='-trlessquad', components=('XX-YY', '2ZZ-XX-YY', 'XY', 'XZ', 'YZ'),
        header='traceless_quadrupole', units='Debye * Ang.'),
    MomentData(flag='-octupole', components=('XXX', 'YYY', 'ZZZ', 'XYY', 'XXY',
        'XXZ', 'XZZ', 'YZZ', 'YYZ', 'XYZ'), header='octupole', units='Debye * Ang.^2'),
    MomentData(flag='-trlessoct', components=('XXX', 'YYY', 'ZZZ', 'XYZ', 'XYY-XZZ',
        'XXY-YZZ', 'XXZ-YYZ'), header='traceless_octupole', units='Debye * Ang.^2'),
    MomentData(flag='-hexadecapole', components=('XXXX', 'YYYY', 'ZZZZ', 'XXXY',
        'XXXZ', 'YYYX', 'YYYZ', 'ZZZX', 'ZZZY', 'XXYY', 'XXZZ', 'YYZZ', 'XXYZ',
        'YYXZ', 'ZZXY'), header='hexadecapole', units='Debye * Ang.^3')
]
# yapf: enable

DescriptorUtility = namedtuple('DescriptorUtilitity', [
    'job_class', 'script_name', 'in_mae_flag', 'out_mae_flag', 'other_flags',
    'property_families', 'descriptor_header'
])

JAGUAR_PROP_FAMILY = '_j_'
SKIPPING_MSG = 'Skipping the descriptors for all structures.'
INPUT_OUTPUT_ERROR = ('The number of structures in the input and output '
                      'files for {0} is not the same. ') + SKIPPING_MSG

# the moldescriptors utility necessarily runs under JobControl even when
# using -NOJOBID therefore use JobDJ as opposed to subprocess.check_call
# and specify all job class types

LIGFILTER_DU = DescriptorUtility(job_class=jobutils.RobustSubmissionJob,
                                 script_name='ligfilter',
                                 in_mae_flag=None,
                                 out_mae_flag='-o',
                                 other_flags=['-add_descriptors'],
                                 property_families={'ligfilter'},
                                 descriptor_header='ligfilter')

CANVAS_DU = DescriptorUtility(
    job_class=jobutils.RobustSubmissionJob,
    script_name='canvasMolDescriptors',
    in_mae_flag='-imae',
    out_mae_flag='-omae',
    other_flags=['-All', '-JOB', 'canvas_mol_descriptors'],
    property_families={'canvas'},
    descriptor_header='canvas')

# the following also supports a qikprop option
MOLDESCRIPTORS_DU = DescriptorUtility(job_class=jobutils.RobustSubmissionJob,
                                      script_name='moldescriptors',
                                      in_mae_flag=None,
                                      out_mae_flag='-omae',
                                      other_flags=['-topo'],
                                      property_families={'desc', 'mopac', 'qp'},
                                      descriptor_header='moldescriptors')


[docs]def get_distance_cell(struct, cutoff): """ Create an infrastructure Distance Cell. Struct MUST have the Chorus box properties. :type struct: `schrodinger.structure.Structure` :param struct: Input structure :type cutoff: float :param cutoff: The cutoff for finding nearest neighbor atoms :rtype: `schrodinger.structure.Structure`, , `schrodinger.infra.structure.DistanceCell`, `schrodinger.infra.structure.PBC` :return: Supercell, an infrastructure Distance Cell that accounts for the PBC, and the pbc used to create it. :raise: ValueError if struct is missing PBCs """ xtal_kwargs = { 'bonding': xtal.ParserWrapper.CHOICE_OFF, 'bond_orders': xtal.ParserWrapper.CHOICE_OFF, 'translate': xtal.ParserWrapper.CHOICE_OFF, } # Get params from chorus params = xtal.get_params_from_chorus(xtal.get_chorus_properties(struct)) min_lattice = min(params[:3]) if cutoff > min_lattice: # round up to the closest integer extents = [int(numpy.ceil(cutoff / min_lattice))] * 3 supercell = xtal.get_cell(struct.copy(), extents=extents, xtal_kwargs=xtal_kwargs) else: supercell = struct.copy() pbc = cstruct.create_pbc(supercell) cell = infrastructure.DistanceCell(supercell, cutoff, pbc) return supercell, cell, pbc
[docs]def elemental_generator(struct, element, is_equal=True): comparison = operator.eq if is_equal else operator.ne return (x for x in struct.atom if comparison(x.element, element))
[docs]def get_anion(struct): """ Get the most electronegative element in the structure (anion). :type struct: `schrodinger.structure.Structure` :param struct: Input structure :rtype: str, float, int :return: Element, it's electronegativity, number of anions in the cell """ eneg = -numpy.inf anion = None elements_dict = defaultdict(int) for atom in struct.atom: element_eneg = ElementalProperties(atom.element).eneg elements_dict[atom.element] += 1 if element_eneg > eneg: anion = atom.element eneg = element_eneg return anion, eneg, elements_dict[atom.element]
[docs]class LatticeFeatures(BaseFeaturizer): """ Class to generate lattice-based features. """ # These features are defined in the SI of 10.1039/c6ee02697d # Sendek et al, Energy Environ. Sci., 2017,10, 306-320 FEATURES = {'avgAtomicVol': 'Average atomic volume', # S 1.1 'avgNeighborCount': 'Average neighbor count', # S 1.5 'stdNeighborCount': 'Standard deviation of neighbor count', # S 1.2 'avgSublatticeNeighborCount': 'Average sublattice neighbor count', # S 1.8 'avgNeighborIon': 'Average neighbor ionicity', # S 1.4 'stdNeighborIon': 'Standard deviation of neighbor ionicity', # S 1.3 'avgSublatticeNeighborIon': 'Average sublattice neighbor ionicity', # S 1.7 'volPerAnion': 'Volume per anion', # S 1.11 'avgSublatticeEneg': 'Average sublattice electronegativity', # S 1.14 'packingFraction': 'Crystal packing fraction', # S 1.16 'avgElementNeighborCount': 'Average cation count', # S 1.6 'avgAnionAnionShortDistance': 'Average anion anion shortest distance', # S 1.10 'avgElementAnionShortDistance': 'Average cation anion shortest distance', # S 1.12 'avgShortDistance': 'Average cation cation shortest distance', # S 1.13 'anionFrameCoordination': 'Anion frame coordination', # S 1.9 'sublatticePackingFraction': 'Sublattice packing fraction', # S.15 'pathWidth': 'Average straight-line path width', # S 1.17 'pathWidthEneg': 'Average straight-line path electronegativity', # S 1.18 'ratioIonicity': 'Ratio of average cation to sublattice electronegativity', # S 1.19 'ratioCount': 'Ratio of average cation to sublattice count', # S 1.20 } # yapf: disable
[docs] def __init__(self, features, element='Li', cutoff=4.0): """ Initialize the object. """ # Note that all the passed kwargs must be assigned to self.var !! # This is a requirement of sklearn. self.element = element self.cutoff = cutoff self.features = list(features) # Ensure that all of the features are known assert not set(self.features).difference(self.FEATURES)
[docs] def runFeature(self, feature): """ Get result from a feature. :type feature: str :param: feature: One of the features listed in FEATURES. :rtype: int or float :return: Feature value """ assert feature in self.FEATURES return getattr(self, feature)()
[docs] def transform(self, structs): """ Get numerical features from structures. Also sets features names in self.labels. See parent class for more documentation. :type structs: list(`schrodinger.structure.Structure`) :param structs: List of structures to be featurized :rtype: numpy array of shape [n_samples, n_features] :return: Transformed array """ output = [] self.labels = [] for idx, struct in enumerate(structs): # This MUST be set per structure, this are used in features # calculation self.struct = struct self.scell, self.sdcell, self.spbc = get_distance_cell( self.struct, self.cutoff) output.append([]) for key in self.features: val = self.FEATURES[key] self.labels.append(val) output[idx].append(self.runFeature(key)) return numpy.array(output)
[docs] def avgAtomicVol(self): """ Get average atomic volume. :type struct: `schrodinger.structure.Structure` :param struct: Structure to be used for feature calculation :rtype: float :return: Average atomic volume (A^3) """ vecs = xtal.get_vectors_from_chorus(self.struct) vol = xtal.get_volume_from_vecs(vecs) return vol / float(self.struct.atom_total)
[docs] def avgNeighborCount(self): """ Get average neighbor count. :rtype: float :return: Average neighbor count """ natoms_type = 0 neighbors = 0 for iatom in elemental_generator(self.struct, self.element): natoms_type += 1 # -1 since query_atoms matches also contains atom itself neighbors += len(self.sdcell.query_atoms(*iatom.xyz)) - 1 if natoms_type == 0: return 0. return neighbors / float(natoms_type)
[docs] def stdNeighborCount(self): """ Get standard deviation of neighbor count. :rtype: float :return: Average neighbor count """ neighbors = [] for iatom in elemental_generator(self.struct, self.element): # -1 since query_atoms matches also contains atom itself neighbors.append(len(self.sdcell.query_atoms(*iatom.xyz)) - 1) if len(neighbors) in [0, 1]: return 0. return numpy.std(neighbors, ddof=1)
[docs] def avgSublatticeEneg(self): """ Get average sublattice electronegativity. :rtype: float :return: Average sublattice electronegativity """ enegs = [] for iatom in elemental_generator(self.struct, self.element, is_equal=False): enegs.append(ElementalProperties(iatom.element).eneg) if not enegs: return 0. return sum(enegs) / float(len(enegs))
[docs] def avgSublatticeNeighborCount(self): """ Get average sublattice neighbor count. :rtype: float :return: Average sublattice neighbor count """ natoms_type = 0 neighbors = 0 for iatom in elemental_generator(self.scell, self.element, is_equal=False): natoms_type += 1 # -1 since query_atoms matches also contains atom itself neighbors += len(self.sdcell.query_atoms(*iatom.xyz)) - 1 if natoms_type == 0: return 0. return neighbors / float(natoms_type)
[docs] def avgNeighborIon(self): """ Get average neighbor ionicity. :rtype: float :return: Average neighbor ionicity """ element_eneg = ElementalProperties(self.element).eneg nbonds = 0 avg_eneg = 0. for iatom in elemental_generator(self.scell, self.element): for jatom in self.sdcell.query_atoms(*iatom.xyz): if iatom.index == jatom.getIndex(): continue nbonds += 1 eneg = ElementalProperties( self.scell.atom[jatom.getIndex()].element).eneg avg_eneg += abs(element_eneg - eneg) if nbonds == 0: return 0. return avg_eneg / float(nbonds)
[docs] def stdNeighborIon(self): """ Get standard deviation of neighbor ionicity. :rtype: float :return: Average neighbor ionicity """ element_eneg = ElementalProperties(self.element).eneg enegs = [] for iatom in elemental_generator(self.scell, self.element): for jatom in self.sdcell.query_atoms(*iatom.xyz): if iatom.index == jatom.getIndex(): continue eneg = ElementalProperties( self.scell.atom[jatom.getIndex()].element).eneg enegs.append(abs(element_eneg - eneg)) if len(enegs) in [0, 1]: return 0. return numpy.std(enegs, ddof=1)
[docs] def avgSublatticeNeighborIon(self): """ Get average sublattice neighbor ionicity. :rtype: float :return: Average sublattice neighbor count """ nbonds = 0 avg_eneg = 0. for iatom in elemental_generator(self.scell, self.element, is_equal=False): element_eneg = ElementalProperties(iatom.element).eneg for jatom in self.sdcell.query_atoms(*iatom.xyz): if iatom.index == jatom.getIndex(): continue nbonds += 1 eneg = ElementalProperties( self.scell.atom[jatom.getIndex()].element).eneg avg_eneg += abs(element_eneg - eneg) if nbonds == 0: return 0. return avg_eneg / float(nbonds)
[docs] def volPerAnion(self): """ Get volume per anion. :rtype: float :return: Volume per anion """ anion, eneg, nanions = get_anion(self.struct) vecs = xtal.get_vectors_from_chorus(self.struct) vol = xtal.get_volume_from_vecs(vecs) return vol / float(nanions)
[docs] def packingFraction(self, skip_element=None): """ Get packing fraction of the crystal. :type skip_element: str :param skip_element: Element to skip :rtype: float :return: Packing fraction """ total_radius = 0. for atom in self.struct.atom: if atom.element == skip_element: continue eff_radius = self.effectiveRadius(atom) if eff_radius == 0: return 0. total_radius += eff_radius**3 vecs = xtal.get_vectors_from_chorus(self.struct) vol = xtal.get_volume_from_vecs(vecs) return 4 * numpy.pi * total_radius / (3 * vol)
[docs] def effectiveRadius(self, atom): """ Get atom effective radius. :type atom: schrodinger.structure._StructureAtom :param atom: Atom :rtype: float :return: Effective radius """ properties = ElementalProperties(atom.element) eneg = properties.eneg try: cov_radius = properties.cradius ion_radius = properties.iradius except (ValueError, KeyError) as err: return 0. enegs = [] for jatom in self.sdcell.query_atoms(*atom.xyz): if atom.index == jatom.getIndex(): continue jelem = self.scell.atom[jatom.getIndex()].element jeneg = ElementalProperties(jelem).eneg enegs.append(abs(eneg - jeneg)) if not enegs: return 0. bond_ionicity = numpy.average(enegs) return cov_radius if bond_ionicity < 2 else ion_radius
[docs] def sublatticePackingFraction(self): """ Get packing fraction of the sublattice crystal. :rtype: float :return: Packing fraction """ return self.packingFraction(skip_element=self.element)
[docs] def avgElementNeighborCount(self): """ Get average element neighbor count. :rtype: float :return: Average number of bonds per element """ natoms_type = 0 neighbors = 0 for iatom in elemental_generator(self.struct, self.element): natoms_type += 1 matches = self.sdcell.query_atoms(*iatom.xyz) # -1 since query_atoms matches also contains atom itself neighbors += len([ x for x in matches if self.scell.atom[x.getIndex()].element == self.element ]) - 1 if natoms_type == 0: return 0. return neighbors / float(natoms_type)
[docs] def avgAnionAnionShortDistance(self): """ Get average anion anion shortest distance. :rtype: float :return: Average anion anion shortest distance """ anion, eneg, nanions = get_anion(self.struct) distances = [] for iatom in elemental_generator(self.struct, anion): min_dist = numpy.inf for jatom in elemental_generator(self.scell, anion): if iatom.index == jatom.index: continue distance = self.spbc.getDistance(self.struct, iatom, self.scell, jatom) min_dist = min(min_dist, distance) if min_dist == numpy.inf: continue distances.append(min_dist) if not distances: return 0. return numpy.average(distances)
[docs] def avgElementAnionShortDistance(self): """ Get average element anion shortest distance. :rtype: float :return: Average element anion shortest distance """ anion, eneg, nanions = get_anion(self.struct) distances = [] for iatom in elemental_generator(self.struct, self.element): min_dist = numpy.inf for jatom in elemental_generator(self.scell, anion): if iatom.index == jatom.index: continue distance = self.spbc.getDistance(self.struct, iatom, self.scell, jatom) min_dist = min(min_dist, distance) if min_dist == numpy.inf: continue distances.append(min_dist) if not distances: return 0. return numpy.average(distances)
[docs] def avgShortDistance(self): """ Get average element element shortest distance. :rtype: float :return: Average element element shortest distance """ distances = [] for iatom in elemental_generator(self.struct, self.element): min_dist = numpy.inf for jatom in elemental_generator(self.scell, self.element): if iatom.index == jatom.index: continue distance = self.spbc.getDistance(self.struct, iatom, self.scell, jatom) min_dist = min(min_dist, distance) if min_dist == numpy.inf: continue distances.append(min_dist) if not distances: return 0. return numpy.average(distances)
[docs] def anionFrameCoordination(self): """ Get anion framework coordination. :rtype: float :return: Anion framework coordination """ anion, eneg, nanions = get_anion(self.struct) afc = 0. nanions = 0 for iatom in elemental_generator(self.struct, anion): min_dist = numpy.inf for jatom in elemental_generator(self.scell, anion): if iatom.index == jatom.index: continue distance = self.spbc.getDistance(self.struct, iatom, self.scell, jatom) min_dist = min(min_dist, distance) if min_dist == numpy.inf: continue cutoff = min_dist + 1.0 supercell, cell, pbc = get_distance_cell(self.struct, cutoff) # -1 since query_atoms matches also contains atom itself afc += len([ x for x in cell.query_atoms(*iatom.xyz) if supercell.atom[x.getIndex()].element == anion ]) - 1 nanions += 1 if nanions == 0: return 0. return afc / float(nanions)
[docs] def pathWidth(self, eval_eneg=False): """ Evaluate average straight line path width. See the reference in the constructor for more info. :type eval_eneg: bool :param eval_eneg: If True, return average over electronegativity, instead of distance :rtype: float :return: Average path or electronegativity """ path_widths = [] path_enegs = [] for iatom in elemental_generator(self.scell, self.element): closest_distance = numpy.inf closest_atom = None for jatom in elemental_generator(self.scell, self.element): if jatom.index == iatom.index: continue distance = measure_distance(iatom, jatom) if distance < closest_distance: closest_distance = distance closest_atom = jatom if not closest_atom: continue path_width = numpy.inf path_atom = None for atom in self.scell.atom: if atom in [iatom, closest_atom]: continue # The goal is to get side2 of a triangle: iatom, atom and the # normal to the iatom - closest_atom line # atom # / | # / | side2 # / | # iatom <--- closest_distance ---> closest_atom angle1 = measure_bond_angle(closest_atom, iatom, atom) if not 0. < angle1 < 90.: continue angle2 = measure_bond_angle(iatom, closest_atom, atom) if not 0. < angle2 < 90.: continue hypotenuse = measure_distance(iatom, atom) side2 = abs(numpy.cos(numpy.radians(angle1))) * hypotenuse eff_radius = self.effectiveRadius(atom) if eff_radius == 0.: return 0. side2 = side2 - eff_radius if 0 < side2 < path_width: path_width = side2 path_atom = atom if not path_atom: continue path_widths.append(path_width) path_enegs.append(ElementalProperties(path_atom.element).eneg) if not path_widths: return 0. return (numpy.average(path_enegs) if eval_eneg else numpy.average(path_widths))
[docs] def pathWidthEneg(self): """ Evaluate average straight line path electronegativity. :rtype: float :return: Average electronegativity along the path """ return self.pathWidth(eval_eneg=True)
[docs] def ratioIonicity(self): """ Get ratio ionicity. :rtype: float :return: Ratio ionicity """ sublattice = self.avgSublatticeNeighborIon() if sublattice == 0.: return 0. return self.avgNeighborIon() / sublattice
[docs] def ratioCount(self): """ Get ratio neighbor count. :rtype: float :return: Ratio neighbor count """ sublattice = self.avgSublatticeNeighborCount() if sublattice == 0.: return 0. return self.avgNeighborCount() / sublattice
[docs]class Ligand(object): """ Manage a ligand. """
[docs] def __init__(self, st, metal_atom, new_to_old, coordination_idxs): """ Create an instance. :type st: `schrodinger.structure.Structure` :param st: the structure :type metal_atom: `schrodinger.structure._StructureAtom` :param metal_atom: the metal atom :type new_to_old: dict :param new_to_old: the map of new indices (extracted ligand) to old indices (original structure) :type coordination_idxs: list :param coordination_idxs: contains groups of indicies (new indices) of coordinating atoms """ self.st = st self.metal_atom = metal_atom self.new_to_old = new_to_old self.coordination_idxs = coordination_idxs
[docs] def getVec(self, point): """ Return a vector pointing from the metal atom to the given point. :type point: `numpy.array` :param point: the point in Ang. :rtype: `numpy.array` :return: the vector in Ang. """ return point - numpy.array(self.metal_atom.xyz)
[docs] def getCentroid(self, st, idxs): """ Return the centroid vector of the given coordination atom indices. :type st: `schrodinger.structure.Structure` :param st: the structure :type idxs: list :param idxs: the coordination indices :rtype: `numpy.array` :return: the centroid vector in Ang. """ return transform.get_centroid(st, atom_list=idxs)[:3]
[docs] def getCoordinationVec(self, st, idxs): """ Return a coordination vector pointing from the metal atom to the centroid of the given coordination atom indices. :type st: `schrodinger.structure.Structure` :param st: the structure :type idxs: list :param idxs: the coordination indices :rtype: `numpy.array` :return: the coordination vector in Ang. """ return self.getVec(self.getCentroid(st, idxs))
[docs] def getStoichiometry(self): """ Return the stoichiometry. :rtype: str :return: the stoichiometry """ return analyze.generate_molecular_formula(self.st)
[docs] def getDenticity(self): """ Return the denticity. :rtype: int :return: the denticity """ return len(self.coordination_idxs)
[docs] def getHapticity(self): """ Return the hapticity. :rtype: int :return: the hapticity """ return max( len(coordination_idxs) for coordination_idxs in self.coordination_idxs)
[docs] def getHapticCharacter(self): """ Return the haptic character. :rtype: int :return: the haptic character """ characters = [] for coordination_idxs in self.coordination_idxs: bond_lengths = [] for coordination_idx in coordination_idxs: coordination_atom = self.st.atom[coordination_idx] bond_length = transform.get_vector_magnitude( self.getVec(numpy.array(coordination_atom.xyz))) bond_lengths.append(bond_length) min_bond_length = min(bond_lengths) character = sum(1 for bond_length in bond_lengths if abs(bond_length - min_bond_length) <= 0.05) characters.append(character) return max(characters)
[docs] def getBiteAngle(self): """ Return the bite angle in degrees. :rtype: float or None :return: the bite angle in degrees """ # follows from https://en.wikipedia.org/wiki/Bite_angle if self.getDenticity() == 1: return bite_angles = [] for pair in itertools.combinations(self.coordination_idxs, 2): vec_idxs, vec_jdxs = map( lambda x: self.getCoordinationVec(self.st, x), pair) bite_angle = transform.get_angle_between_vectors(vec_idxs, vec_jdxs) bite_angles.append(bite_angle) return math.degrees(numpy.mean(bite_angles))
[docs] def getAtomConeAngle(self, atom): """ Return the cone angle for the given atom in degrees. :type atom: `schrodinger.structure._StructureAtom` :param atom: the atom :rtype: float :return: the cone angle for the given atom in degrees """ vec = numpy.array(atom.xyz) proj_z = numpy.dot(vec, Z_VEC) * Z_VEC proj_xy = vec - proj_z if transform.get_vector_magnitude(proj_xy) < 1E-3: scaled_proj_xy = atom.radius * X_VEC else: scaled_proj_xy = proj_xy + atom.radius * transform.get_normalized_vector( proj_xy) scaled_vec = proj_z + scaled_proj_xy return 2 * math.degrees( transform.get_angle_between_vectors(scaled_vec, Z_VEC))
[docs] def getConeAngle(self): """ Return the cone angle in degrees. :rtype: float :return: the cone angle in degrees """ # follows from https://en.wikipedia.org/wiki/Ligand_cone_angle, however # the convention for reporting cone angles is not to use 3D solid angles # in unitless steradians but rather simply 2D angles in degrees (twice # the angle to the cone's central axis) # the following cone angles in units of degrees are between maximally # subtending ligand branch atoms and the z-axis cone_angles = [] # loop over groups of coordinating atoms that are bonded to one another, # for example where ligand hapticity > 1 for coordination_idxs in self.coordination_idxs: # rotate the ligand so that the metal-to-centroid vector is along # the z-axis and translate so that the metal atom would be at the # origin coordination_vec = self.getCoordinationVec(self.st, coordination_idxs) rot_matrix = transform.get_alignment_matrix(coordination_vec, Z_VEC) st = self.st.copy() transform.transform_structure(st, rot_matrix) dx, dy, dz = map(lambda x: -1 * x, self.getCentroid(st, coordination_idxs)) dz += transform.get_vector_magnitude(coordination_vec) transform.translate_structure(st, x=dx, y=dy, z=dz) # loop over coordinating atoms in the current coordinating group for coordination_idx in coordination_idxs: coordination_atom = st.atom[coordination_idx] # if this entire ligand is a single atom then make an arbitrary # vector and obtain the single angle if not coordination_atom.bond_total: cone_angles.append(self.getAtomConeAngle(coordination_atom)) # otherwise loop over ligand branches eminating from the current # coordinating atom and collect angles for branch_start_atom in coordination_atom.bonded_atoms: if branch_start_atom.index in coordination_idxs: continue branch_idxs = rxn_channel.indicies_from_bonds_deep( st, branch_start_atom.index, exclude_atom_indicies=[coordination_idx]) # FIXME - it is not clear from the documentation how to handle # a branch containing sub-branches that both do and do not # coordinate (see the following classification) # loop over other groups of coordinating atoms, if the current # branch coordinates somewhere in addition to the coordination # from which it eminates then we have a multidentate ligand # which needs special angle handling branch_coordinates = False for other_coordination_idxs in self.coordination_idxs: if other_coordination_idxs == coordination_idxs: continue if set(other_coordination_idxs).issubset( set(branch_idxs)): branch_coordinates = True pair = [coordination_idxs, other_coordination_idxs] vec_idxs, vec_jdxs = map( lambda x: self.getCoordinationVec(self.st, x), pair) bite_angle = math.degrees( transform.get_angle_between_vectors( vec_idxs, vec_jdxs)) cone_angles.append(bite_angle) # if the branch has no other coordination then loop over # branch atoms, find the atom that maximally subtends, and # collect the angle if not branch_coordinates: max_cone_angle = 0 for branch_idx in branch_idxs: branch_atom = st.atom[branch_idx] cone_angle = self.getAtomConeAngle(branch_atom) if cone_angle > max_cone_angle: max_cone_angle = cone_angle cone_angles.append(max_cone_angle) return numpy.mean(cone_angles)
[docs] def getBondLength(self): """ Return the bond length in Ang. :rtype: float :return: the bond length in Ang. """ return numpy.mean([ transform.get_vector_magnitude(self.getCoordinationVec(self.st, x)) for x in self.coordination_idxs ])
[docs] def getDescriptors(self): """ Return descriptors. :rtype: dict :return: (label, data) pairs """ adict = {} adict['stoichiometry'] = self.getStoichiometry() adict['denticity'] = self.getDenticity() adict['hapticity'] = self.getHapticity() adict['haptic_character'] = self.getHapticCharacter() bite_angle = self.getBiteAngle() if bite_angle is not None: adict['bite_angle/deg.'] = bite_angle adict['cone_angle/deg.'] = self.getConeAngle() adict['bond_length/Ang.'] = self.getBondLength() return adict
[docs]class Complex(object): """ Manage a complex. """ BURIED_VOLUME_VDW_SCALE = 1.17 CONTOURS_DIR = 'contours'
[docs] def __init__(self, st, logger=None, nonmetallic_centers=()): """ Create an instance. :type st: `schrodinger.structure.Structure` :param st: the structure :type logger: logging.Logger or None :param logger: output logger or None if there isn't one :param tuple nonmetallic_centers: Tuple of nonmetallic elements to also consider when looking for center atom """ self.st = st self.logger = logger self.nonmetallic_centers = nonmetallic_centers self.metal_atom = None self.ligands = []
[docs] def setMetalAtom(self): """ Set the metal atom. """ metal_idxs = analyze.evaluate_asl(self.st, 'metals') if not metal_idxs and self.nonmetallic_centers: asls = [f'atom.ele {ele}' for ele in self.nonmetallic_centers] asl = ' OR '.join(asls) metal_idxs = analyze.evaluate_asl(self.st, asl) if not metal_idxs: metal_idxs = set() for atom in self.st.atom: n_zob = sum(1 if bond.order == 0 else 0 for bond in atom.bond) if n_zob > 1: metal_idxs.add(atom.index) metal_idxs = list(metal_idxs) if not metal_idxs or len(metal_idxs) > 1: return self.metal_atom = self.st.atom[metal_idxs[0]]
[docs] def setLigands(self): """ Set the ligands. """ self.setMetalAtom() if not self.metal_atom: return [] all_coordination_idxs = [ atom.index for atom in self.metal_atom.bonded_atoms ] st = self.st.copy() for idx in all_coordination_idxs: st.deleteBond(self.metal_atom.index, idx) self.ligands = [] for molecule in st.molecule: all_idxs = molecule.getAtomIndices() ligand = molecule.extractStructure() new_to_old = dict(zip(range(1, ligand.atom_total + 1), all_idxs)) old_to_new = dict([(v, k) for k, v in new_to_old.items()]) coordination_idxs = [ old_to_new[idx] for idx in all_idxs if idx in all_coordination_idxs ] if coordination_idxs: coordination_idxs = analyze.group_by_connectivity( ligand, coordination_idxs) self.ligands.append( Ligand(ligand, self.metal_atom, new_to_old, coordination_idxs))
[docs] def getBondAngle(self): """ Return the bond angle in degrees. :rtype: float :return: the bond angle in degrees """ bond_vectors = [] for ligand in self.ligands: for coordination_idxs in ligand.coordination_idxs: bond_vector = ligand.getCoordinationVec(ligand.st, coordination_idxs) bond_vectors.append(bond_vector) bond_angles = [ transform.get_angle_between_vectors(vec_i, vec_j) for vec_i, vec_j in itertools.combinations(bond_vectors, 2) ] return math.degrees(numpy.mean(bond_angles))
[docs] def getVDWSurfaceArea(self): """ Return the VDW surface area in Angstrom^2. :rtype: float :return: the VDW surface area in Angstrom^2 """ name = 'surface' surf = surface.Surface.newMolecularSurface( self.st, name, resolution=0.5, mol_surf_type=surface.MolSurfType.vdw) return surf.surface_area
[docs] def getVDWVolume(self, vdw_scale=1, buffer_len=2): """ Return the VDW volume in Angstrom^3. :type vdw_scale: float :param vdw_scale: the VDW scale :type buffer_len: float :param buffer_len: a shape buffer lengths in Angstrom :rtype: float :return: the VDW volume in Angstrom^3 """ return amorphous.Scaffold.approximateVolume(self.st, vdw_scale=vdw_scale, seed=123, buffer_len=buffer_len)
[docs] def getBuriedVolumeStructure(self, only_largest_ligands=False): """ Return a copy of the structure without the metal atom. If only_largest_ligands is True, it will only contain the largest ligand or multiple copies thereof if it is symmetric. :param bool only_largest_ligands: Whether small ligands should be deleted :rtype: `schrodinger.structure.Structure` :return: the structure containing some or all ligands """ idxs = [self.metal_atom.index] if only_largest_ligands: max_n_atoms = max(ligand.st.atom_total for ligand in self.ligands) for ligand in self.ligands: if ligand.st.atom_total < max_n_atoms: idxs.extend(ligand.new_to_old[atom.index] for atom in ligand.st.atom) st = self.st.copy() st.deleteAtoms(idxs) return st
[docs] def getBuriedVDWVolumePct(self, struct, vdw_scale=BURIED_VOLUME_VDW_SCALE): """ Return the buried VDW volume percent. :param structure.Structure struct: The structure to get buried volume for :type vdw_scale: float :param vdw_scale: the VDW scale :rtype: float :return: the buried VDW volume percent """ # the algorithm for buried VDW volume percent is defined # in ACS Catal. 2015, 6815 and Eur. J. Inorg. Chem. 2009, 1759, # however here we use a slightly different metal positioning # and a Monte Carlo algorithm volume, percent = amorphous.Scaffold.approximateVolume( struct, vdw_scale=vdw_scale, seed=123, sphere_radius=3.5, sphere_origin=numpy.array(self.metal_atom.xyz), buffer_len=0, return_ratio=True) return percent
[docs] def getFreeVolumeVector(self): """ Return a unit vector pointing from the metal atom of the complex in the direction of free volume. :rtype: numpy.array :return: the free volume unit vector """ max_n_atoms = max(ligand.st.atom_total for ligand in self.ligands) bond_vectors = [] for ligand in self.ligands: if ligand.st.atom_total < max_n_atoms: continue for coordination_idxs in ligand.coordination_idxs: bond_vector = ligand.getCoordinationVec(ligand.st, coordination_idxs) bond_vectors.append(bond_vector) return -1 * transform.get_normalized_vector(sum(bond_vectors))
[docs] def getRotatedComplex(self): """ Return a copy of the complex that is rotated so that the free volume vector points along the positive z-axis. :rtype: `structure.Structure` :return: A rotated copy of the input structure """ struct = self.st.copy() transform.translate_atom_to_origin(struct, struct.atom[self.metal_atom.index]) vec = self.getFreeVolumeVector() # see MATSCI-11215 where symmetric TiCl4 results in a vector of # zero length, in this case do not rotate if transform.get_vector_magnitude(vec) > 0: rot_matrix = transform.get_alignment_matrix(vec, Z_VEC) transform.transform_structure(struct, rot_matrix) return struct
[docs] def exportBuriedVolumeContour(self, sphere_radius=3.5, vdw_scale=BURIED_VOLUME_VDW_SCALE, num_bins=30, seed=1234): """ Export the buried volume contour for the complex :param float sphere_radius: The radius for the sphere to sample points in :param float vdw_scale: The VdW scale factor to apply to VdW radii when checking to see if a point is "inside" an atom :param int num_bins: The number of bins in x and y direction to put the points in :param int seed: Seed for random number generation :rtype: str, str :return: The paths to contour png and csv files """ if seed is not None: my_random = random.Random() my_random.seed(seed) else: my_random = random struct = self.getRotatedComplex() max_rad = amorphous.get_maximum_vdw_radius(struct) max_rad *= vdw_scale d_cell, _ = cstruct.create_distance_cell(struct, max_rad) bins = defaultdict(dict) bin_edges = numpy.linspace(-sphere_radius, sphere_radius, num=num_bins + 1, endpoint=True) num_tries = max(3000 * struct.atom_total, 200000) for _ in range(num_tries): vec = [my_random.gauss(0, 1) for _ in range(3)] coords = (sphere_radius * my_random.uniform(0, 1)** (1. / 3.)) * transform.get_normalized_vector(vec) neighbor_atoms = [ struct.atom[match.getIndex()] for match in d_cell.query_atoms(*coords) ] for atom in neighbor_atoms: dist = transform.get_vector_magnitude(atom.xyz - coords) if dist <= atom.radius * vdw_scale: # idx is 1 based. 0 means less than first bin edge x_idx = numpy.digitize(coords[0], bin_edges) y_idx = numpy.digitize(coords[1], bin_edges) # Store the point with the highest Z value in each bin if (y_idx not in bins[x_idx] or bins[x_idx][y_idx][2] < coords[2]): bins[x_idx][y_idx] = coords break contour_points = [] for inner_dict in bins.values(): for point in inner_dict.values(): contour_points.append(point) contour_points = numpy.array(contour_points) self.plotContour(contour_points) os.makedirs(self.CONTOURS_DIR, exist_ok=True) title = getattr(self.st, 'unique_title', None) or self.st.title png_path = os.path.join(self.CONTOURS_DIR, f'{title}.png') self.canvas.print_figure(png_path) jobutils.add_outfile_to_backend(png_path) csv_path = os.path.join(self.CONTOURS_DIR, f'{title}.csv') numpy.savetxt(csv_path, contour_points, header='X (A), Y (A), Z (A)', delimiter=',') jobutils.add_outfile_to_backend(csv_path) return png_path, csv_path
[docs] def plotContour(self, points): """ Plot a contour for the passed points. matplotlib uses triangulation to create a grid for the contour. :param `numpy.array` points: The x, y, z values of points """ from matplotlib import figure from matplotlib.backends.backend_agg import FigureCanvasAgg fig = figure.Figure(figsize=(10, 7.5), dpi=200) self.canvas = FigureCanvasAgg(fig) axes = fig.add_subplot(111) fig.subplots_adjust(left=0.1, right=0.95) angstrom_unit = f' ({msconst.ANGSTROM_UNICODE})' axes.set_xlabel('X' + angstrom_unit) axes.set_ylabel('Y' + angstrom_unit) cmap = matplotlib.cm.get_cmap('jet') contour = axes.tricontourf(points[:, 0], points[:, 1], points[:, 2], 15, cmap=cmap) color_bar = fig.colorbar(contour) color_bar.set_label('Z' + angstrom_unit) self.canvas.draw()
[docs] def getVectorizedDescriptors(self, jaguar_out_file): """ Return vectorized descriptors which are instance specific descriptors that are vectorized, for example depending on the molecule's geometric orientation, atom indexing, etc. :type jaguar_out_file: str or None :param jaguar_out_file: the name of a Jaguar `*.out` file from which descriptors will be extracted or None if there isn't one :rtype: dict :return: (label, data) pairs """ # these properties are instance dependent and should be reduced # to a single hash (potentially non-physical), that can be used to # quantify the differences between different molecules, prior to # use in building machine learning models adict = {} if jaguar_out_file is None: return adict base_cmd = ['jaguar', 'results', jaguar_out_file] for moment_data in MOMENTS_DATA: cmd = base_cmd + [moment_data.flag] try: output = subprocess.check_output(cmd) except subprocess.CalledProcessError as err: if self.logger: self.logger.warning(f'The following command fails: {cmd}.') self.logger.warning(str(err)) self.logger.info('') continue output = output.strip() # if you query non-existing data then you get 'N/A' if not output or output == 'N/A': continue try: output = list(map(float, output.split())) except ValueError: continue for component, value in zip(moment_data.components, output): key = moment_data.header + '_' + component + '/' + moment_data.units adict[key] = value return adict
[docs] def getDescriptors(self, no_organometallic=False): """ Return descriptors. :param bool no_organometallic: Whether organometallic descriptors should be skipped :rtype: dict :return: (label, data) pairs """ adict = {} adict['vdw_surface_area/Ang.^2'] = self.getVDWSurfaceArea() adict['vdw_volume/Ang.^3'] = self.getVDWVolume() if no_organometallic: return adict self.setLigands() if self.ligands: for idx, ligand in enumerate(self.ligands, 1): base_label = f'ligand_{idx}_' for label, datum in ligand.getDescriptors().items(): adict[base_label + label] = datum adict['bond_angle/deg.'] = self.getBondAngle() adict['buried_volume_%_all_ligands'] = self.getBuriedVDWVolumePct( self.getBuriedVolumeStructure()) adict[ 'buried_volume_%_largest_ligand'] = self.getBuriedVDWVolumePct( self.getBuriedVolumeStructure(only_largest_ligands=True)) png_path, csv_path = self.exportBuriedVolumeContour() adict['buried_volume_contour'] = png_path adict['buried_volume_csv'] = csv_path return adict
[docs]def get_unique_titles(sts): """ Return a list of unique titles for the given structures. :type sts: list :param sts: contains `schrodinger.structure.Structure` :rtype: list :return: the unique titles """ cleaner = jobutils.StringCleaner(separator='.') titles = [] for st in sts: title = st.title.strip() or 'structure' titles.append(cleaner.cleanAndUniquify(title)) return titles
[docs]class ComplexFeatures(BaseFeaturizer): """ Class to generate features for metal complexes. """
[docs] def __init__(self, jaguar=False, jaguar_keywords=rxnwfu.DEFAULT_JAGUAR_KEYWORDS, tpp=rxnwfu.DEFAULT_TPP, ligfilter=False, no_organometallic=False, nonmetallic_centers=(), canvas=False, moldescriptors=False, include_vectorized=False, save_files=False, logger=None): """ Create an instance. :type jaguar: bool :param jaguar: specify whether to calculate Jaguar features :type jaguar_keywords: OrderedDict :param jaguar_keywords: if Jaguar jobs must be run to calculate the Jaguar features then specify the Jaguar keywords here :type tpp: int :param tpp: the number of threads for any Jaguar jobs :type ligfilter: bool :param ligfilter: specify whether to calculate Ligfilter features :type no_organometallic: bool :param no_organometallic: Whether organometallic descriptors should be skipped :type canvas: bool :param canvas: specify whether to calculate Canvas features :type moldescriptors: bool or list :param moldescriptors: specify whether to calculate Molecular Descriptors features. If it's a list, it contains command line arguments for moldescriptors :type include_vectorized: bool :param include_vectorized: whether to include instance specific features that are vectorized, for example depending on the molecule's geometric orientation, atom indexing, etc. :param bool save_files: Whether to save subjob files or not :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ self.jaguar = jaguar self.jaguar_keywords = jaguar_keywords self.tpp = tpp self.ligfilter = ligfilter self.no_organometallic = no_organometallic self.nonmetallic_centers = nonmetallic_centers self.canvas = canvas self.moldescriptors = moldescriptors self.include_vectorized = include_vectorized self.save_files = save_files self.logger = logger
[docs] def runJaguar(self): """ Run Jaguar on the given structures. :rtype: list :return: contains Jaguar `*.out` file names """ options = argparse.Namespace(TPP=self.tpp) job_dj = jobutils.create_queue(options=options, host=jobutils.AUTOHOST) max_atoms = jaguarworkflows.get_jaguar_max_atoms() titles = [] to_skip = set() for st in self.structs: title = st.unique_title titles.append(title) if st.atom_total > max_atoms: to_skip.add(title) continue # handle restarts if not jaguar_restart.needs_restart(title): continue _st = st.copy() kwargs = {'structure': _st} ji = JaguarInput(**kwargs) kwargs = self.jaguar_keywords.copy() kwargs.update(RESERVED_JAGUAR_KEYWORDS) ji.setValues(kwargs) ji.setStructure(_st) if ji.sectionDefined('valid'): ji.deleteSection('valid') in_file = title + '.in' ji.saveAs(in_file) cmd = ['jaguar', 'run', '-TPP', str(self.tpp), in_file] job_dj.addJob(jobutils.RobustSubmissionJob(cmd)) if job_dj.total_added: self.log('Running Jaguar jobs...', pad=True) job_dj.run() out_files = [] for title in titles: if title in to_skip: out_files.append(None) continue if self.save_files: jaguarworkflows.add_jaguar_files_to_jc_backend(title) out_file = title + '.out' if not os.path.exists(out_file): out_files.append(None) continue jag_out = JaguarOutput(out_file) if jag_out.status != JaguarOutput.OK or jag_out.fatal_error: out_files.append(None) continue out_files.append(out_file) return out_files
[docs] def getFeatures(self, structs, jaguar_out_files=None): """ Return features dictionary for the given structures :type structs: list(`schrodinger.structure.Structure`) :param structs: list of structures to be featurized :type jaguar_out_files: list or None :param jaguar_out_files: if Jaguar features should be calculated using existing Jaguar `*.out` files then specify the files here using the same ordering as used for any given structures """ self.structs = structs self.jaguar_out_files = jaguar_out_files self.struct_file = None # Set titles on structures titles = get_unique_titles(structs) for struct, unique_title in zip(structs, titles): struct.unique_title = unique_title if self.jaguar: self.verifyJaguarOutfiles() # An empty dict should be returned if no descriptors were calculated # for a structure. The order is the same as that of input structures all_descriptors = {title: {} for title in titles} descriptors_dicts = [ self.getComplexDescriptors(), self.getJaguarDescriptors(), self.getUtilityDescriptors() ] # Merge all dictionaries for title in titles: for descriptors in descriptors_dicts: if title in descriptors: all_descriptors[title].update(descriptors[title]) return all_descriptors
[docs] def verifyJaguarOutfiles(self): """ Run jaguar and get the out-files if the out-files have not been provided """ if self.jaguar_out_files: assert len(self.jaguar_out_files) == len(self.structs) return max_atoms = jaguarworkflows.get_jaguar_max_atoms() to_skip = [st for st in self.structs if st.atom_total > max_atoms] all_skipped = len(to_skip) == len(self.structs) if to_skip: self.log('Jaguar descriptors for input structures with more ' f'than {max_atoms} atoms will be skipped.') if all_skipped: self.log('Skipping all Jaguar descriptors.') if not all_skipped: jaguar_out_files = self.runJaguar() all_failed = all(out is None for out in jaguar_out_files) if all_failed: self.log( 'All Jaguar jobs failed. Skipping all Jaguar descriptors.') else: jaguar_out_files = [None] * len(self.structs) self.jaguar_out_files = jaguar_out_files
[docs] def getComplexDescriptors(self): """ Create a `Complex` object for each structure and get their descriptors :rtype: dict :return: The descriptors from `Complex` for each structure """ descriptors = {} # Use Agg rather than QTAgg for contours to prevent requiring graphics # libraries in case running on a remote compute host old_backend = matplotlib.get_backend() for idx, struct in enumerate(self.structs): acomplex = Complex(struct, logger=self.logger, nonmetallic_centers=self.nonmetallic_centers) st_dict = acomplex.getDescriptors( no_organometallic=self.no_organometallic) if self.include_vectorized: out_file = self.jaguar_out_files[idx] st_dict.update(acomplex.getVectorizedDescriptors(out_file)) descriptors[struct.unique_title] = st_dict matplotlib.use(old_backend) return descriptors
[docs] def getJaguarDescriptors(self): """ Return Jaguar descriptors for all structures. Sets Jaguar atom descriptors on structures. :rtype: dict :return: The jaguar descriptors for all structures. Keys are structure titles and values are dictionaries containing descriptors """ if not self.jaguar or all(out is None for out in self.jaguar_out_files): return {} out_indexes = [] for idx, out_file in enumerate(self.jaguar_out_files): if out_file is not None: out_indexes.append(idx) job_name = 'qm_descriptors' job_dj = jobutils.create_queue(host=jobutils.AUTOHOST) cmd = [ 'jaguar', 'run', 'qm_descriptors.py', '-formats', 'mae', '-jobname', job_name, '-outs' ] + [self.jaguar_out_files[idx] for idx in out_indexes] job_dj.addJob(jobutils.RobustSubmissionJob(cmd)) self.log('Running job for Jaguar descriptors...', pad=True) job_dj.run() out_file = 'all_calcs.props.mae' if self.save_files: jobutils.add_outfile_to_backend(job_name + '.log') jobutils.add_outfile_to_backend(out_file) if not os.path.exists(out_file): self.log("Could not find Jaguar descriptor results. " + SKIPPING_MSG) return {} out_sts = list(structure.StructureReader(out_file)) if len(out_sts) != len(out_indexes): self.log(INPUT_OUTPUT_ERROR.format('Jaguar descriptors')) return {} descriptors = {} for idx, in_st in enumerate(self.structs): if idx not in out_indexes: continue out_st = out_sts.pop(0) # Structure descriptors descriptors[in_st.unique_title] = { key: val for key, val in out_st.property.items() if key[1:4] == JAGUAR_PROP_FAMILY } if len(in_st.atom) != len(out_st.atom): self.log('Different number of atoms in input and output of ' f'Jaguar descriptors for structure {idx + 1} ' + f'({in_st.title}). Atom descriptors will be skipped.') continue # Atom descriptors are set directly on the input structures for in_atom, out_atom in zip(in_st.atom, out_st.atom): in_atom.property.update({ key: val for key, val in out_atom.property.items() if key[1:4] == JAGUAR_PROP_FAMILY }) return descriptors
[docs] def getUtilityDescriptors(self): """ Get the requested utility descriptors for all structures :rtype: dict :return: The descriptor utility descriptors for all structures. Keys are structure titles and values are dictionaries containing descriptors """ jobs_dict = {} if self.ligfilter: jobs_dict[LIGFILTER_DU.descriptor_header] = \ self.getDescriptorUtilityJob(LIGFILTER_DU) if self.canvas: jobs_dict[CANVAS_DU.descriptor_header] = \ self.getDescriptorUtilityJob(CANVAS_DU) mol_desc_job = self.getMolecularDescriptorsJob() if mol_desc_job is not None: jobs_dict[MOLDESCRIPTORS_DU.descriptor_header] = mol_desc_job if not jobs_dict: return {} job_dj = jobutils.create_queue(host=jobutils.AUTOHOST) for job in jobs_dict.values(): job_dj.addJob(job) self.log('Running descriptor utilities: ' + ', '.join(jobs_dict.keys()), pad=True) job_dj.run() return self.processUtilityDescriptorOutputs(jobs_dict)
[docs] def getDescriptorUtilityJob(self, descriptor_utility): """ Get the job to run to generate the descriptors using the passed descriptor_utility for all structures :param `DescriptorUtility` descriptor_utility: The descriptor utility to run to get the descriptors :rtype: `jobutils.RobustSubmissionJob` :return: The job to run to generate the descriptors """ if self.struct_file is None: self.struct_file = 'structures-in.mae' with structure.StructureWriter(self.struct_file) as writer: writer.extend(self.structs) in_file = descriptor_utility.descriptor_header + '-in.mae' shutil.copy(self.struct_file, in_file) out_file = descriptor_utility.descriptor_header + '-out.mae' cmd = [descriptor_utility.script_name] cmd += descriptor_utility.other_flags if descriptor_utility.in_mae_flag: cmd += [descriptor_utility.in_mae_flag, in_file] else: cmd += [in_file] cmd += [descriptor_utility.out_mae_flag, out_file] return descriptor_utility.job_class(cmd)
[docs] def processUtilityDescriptorOutputs(self, jobs_dict): """ Read the descriptors for all descriptor utilities that were run, and return them :type jobs_dict: dict :param jobs_dict: Dictionary with `DescriptorUtility` as keys and jobs as values :rtype: dict :return: The descriptor utility descriptors for all structures. Keys are structure titles and values are dictionaries containing descriptors """ descriptors = defaultdict(dict) for descriptor_utility in [LIGFILTER_DU, CANVAS_DU, MOLDESCRIPTORS_DU]: du_name = descriptor_utility.descriptor_header if du_name not in jobs_dict: continue job = jobs_dict[du_name] out_file = du_name + '-out.mae' if self.save_files: if descriptor_utility.job_class == queue.SubprocessJob: jobutils.add_outfile_to_backend(out_file) else: jobutils.add_subjob_files_to_backend(job) if not os.path.exists(out_file): self.log(f'Could not find descriptor results for {du_name}. ' + SKIPPING_MSG) continue if structure.count_structures(out_file) != len(self.structs): self.log(INPUT_OUTPUT_ERROR.format(du_name)) continue with structure.StructureReader(out_file) as reader: for in_st, out_st in zip(self.structs, reader): st_dict = {} for key, value in out_st.property.items(): parts = key.split('_') if len(parts) < 3: continue # Incorrect property name if parts[1] in descriptor_utility.property_families: st_dict[key] = value descriptors[in_st.unique_title].update(st_dict) return descriptors
[docs] def getMolecularDescriptorsJob(self): """ Get the job to run to generate molecular descriptors for all structures :rtype: `jobutils.RobustSubmissionJob` :return: The job to run to generate the descriptors """ if not self.moldescriptors: return None if self.moldescriptors is True: other_flags = list(MOLDESCRIPTORS_DU.other_flags) else: # moldescriptors is a list of command line flags other_flags = list(self.moldescriptors) if self.save_files: other_flags += ['-savefiles'] other_flags += ['-mopac_keywords', 'super'] # MATSCI-9017 other_flags += ['-skipfailed'] # MATSCI-9580 utility = MOLDESCRIPTORS_DU._replace(other_flags=other_flags) return self.getDescriptorUtilityJob(utility)
[docs] @staticmethod def writeFingerprintFiles(structs): """ Write fingerprint files for the given structures. :type structs: list(`schrodinger.structure.Structure`) :param structs: list of structures to be fingerprinted :rtype: list :return: the fingerprint file names """ sts, props = [], [] obj = go.CanvasKPLS(sts, props) titles = get_unique_titles(structs) name = 'dummy' precision = obj.SINGLE_PRECISION atom_types = None fp_files = [] for title, st in zip(titles, structs): for fp_type in obj.ALLOWED_FP_TYPES: mae_file = title + f'.{fp_type}{go.IN_MAE_EXT}' st.write(mae_file) obj.fp_options_dict = {name: (precision, fp_type, atom_types)} try: fp_file = obj.makeFingerPrintInfile(mae_file, name) except RuntimeError: continue fp_files.append(fp_file) fileutils.force_remove(mae_file) return fp_files
[docs] def log(self, msg, **kwargs): """ Add a message to the log file :param str msg: The message to log Additional keyword arguments are passed to the textlogger.log_msg function """ textlogger.log_msg(msg, logger=self.logger, **kwargs)
[docs]class CrystalNNFeatures: """ Calculates CrystalNN structure fingerprints as implemented in pymatgen """ OPS_PRESET = 'ops' CN_PRESET = 'cn'
[docs] def __init__(self, preset=OPS_PRESET): """ Create a structure featurizer :param str preset: One of `OPS_PRESET` or `CN_PRESET` class constants """ site_featurizer = CrystalNNFingerprint.from_preset( preset, distance_cutoffs=None, x_diff_weight=0) self.st_featurizer = SiteStatsFingerprint(site_featurizer, stats=('mean', 'maximum'))
[docs] def featurize(self, struct): """ Get CrystalNN fingerprints for the passed structure :param `structure.Structure` The structure to get features for :rtype: list :return: List of CrystalNN fingerprints for the structure """ if not xtal.has_pbc(struct): raise ValueError( f'"{struct.title}" has no periodic boundary conditions.') pmg_struct = xtal.get_pymatgen_structure(struct) return self.st_featurizer.featurize(pmg_struct)