Source code for schrodinger.application.matsci.kmc

"""
Utilities for working with VOTCA

Copyright Schrodinger, LLC. All rights reserved.
"""

import os
import pathlib
import sqlite3
from collections import OrderedDict
from collections import namedtuple

from schrodinger.application.desmond import cms
from schrodinger.application.matsci import clusterstruct
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci.nano import xtal
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import transform

DB_VERSION = 0.92

TOPO_ENDING = '_topo.xml'
PDB_ENDING = '.pdb'
MAP_ENDING = '_map.xml'
SQL_ENDING = '.sql'
SEGNAME_ENDING = '_seg'
FRAGNAME_ENDING = '_frag'
TOPOLOGY = 'topology'
MOLECULES = 'molecules'
PARTIAL_RESULTS = 'partial'
HOLE = 'hole'
ELECTRON = 'electron'
CHARGE_TYPES = (HOLE, ELECTRON)
CAP_HOLE = 'Hole'
CAP_ELECTRON = 'Electron'
H_ENDING = 'h'
E_ENDING = 'e'
CHARGE_ENDINGS = {HOLE: H_ENDING, ELECTRON: E_ENDING}
XAX = 'X'
YAX = 'Y'
ZAX = 'Z'
ALL_AXES = [XAX, YAX, ZAX]
AXIS_INDEX = {b: a for a, b in enumerate(ALL_AXES)}
# Make AXIS_INDEX work whether the axis name or index is passed in. That way we
# don't have to preprocess to ensure the axis name is being used as the key.
AXIS_INDEX.update({a: a for a in range(3)})
MOBILITY_TYPE = 'mobility'
VELOCITY_TYPE = 'velocity'
FIELD_TYPE = 'field'
DATABASE_TYPE = 'database'

# VOTCA SQL column names
# Coupling integral squared
JEFF = 'Jeff2'
# Hopping rate
RATE = 'rate'
# Charge occupation fraction
OCCUPATION = 'occP'

DEFAULT_OCCUPANCY = -1

# SQL data types
INT = 'INT'
REAL = 'REAL'
TEXT = 'TEXT'
# Column values
SQL_NOJOB = 'no_job'
SQL_HEAVY = 'heavy'
SQL_ALL = 'all'
SQL_DEPRECATED_NONE = 'none'  # Retained for backwards compatibility
SQL_NOFILE = 'nofile'
SQL_NONAME = 'noname'
SQL_NOT_USED = 'NOT_USED'

# Properties
VOTCA_PROP_START = 'r_matsci_KMC_F'
VOTCA_SPROP_START = 's_matsci_KMC_F'
VELOCITY_PROP = '%s{field}_{charge}_Velocity_{axis}_(m/s)' % VOTCA_PROP_START
MOBILITY_PROP = ('%s{field}_{charge}_Mobility_{axis}_(cm^2/Vs)' %
                 VOTCA_PROP_START)
FIELD_PROP = '%sield_{field}_{axis}_(V/m)' % VOTCA_PROP_START
SQL_FILE = '%s{field}_{charge}_Database' % VOTCA_SPROP_START
PARAM_SQL_FILE = 's_matsci_KMC_Hopping_Params_Database'
VOTCA_JOB_ID = 's_matsci_KMC_Job_ID'

ColumnData = namedtuple('ColumnData', ['type', 'default'])
MoleculeData = namedtuple('MoleculeData',
                          ['index', 'name', 'mtype', 'posx', 'posy', 'posz'])


[docs]def is_votca_prop(prop): """ Check if a property is a votca property :param str prop: The property to check :rtype: str or None :return: If the property is a votca property, the type of property is returned as a module-level constant. If the property is not recognized as a VOTCA property, None is returned. """ if prop.startswith(VOTCA_PROP_START): if '_Mobility_' in prop: return MOBILITY_TYPE elif '_Velocity_' in prop: return VELOCITY_TYPE elif '_Field_' in prop: return FIELD_TYPE elif prop.startswith(VOTCA_SPROP_START) and prop.endswith('_Database'): return DATABASE_TYPE return None
[docs]def parse_mobility_or_velocity_prop(prop): """ Parse a property name and return the information from it if it is a VOTCA mobility or velocity property :param str prop: The property to check :rtype: (int, str, str) or None :return: The integer is the field index, the first string is the charge (HOLE or ELECTRON) and the second string is the axis name. None is returned if the property is not a mobility or velocity property. """ if is_votca_prop(prop) not in (MOBILITY_TYPE, VELOCITY_TYPE): return None tokens = prop.replace(VOTCA_PROP_START, "").split('_') field = int(tokens[0]) charge = tokens[1] axis = tokens[3] return field, charge, axis
[docs]def parse_field_prop(prop): """ Parse a property name and return the information from it if it is a VOTCA field property :param str prop: The property to check :rtype: (int, str) or None :return: The integer is the field index, the string is the axis name. None is returned if the property is not a field property. """ if is_votca_prop(prop) != FIELD_TYPE: return None tokens = prop.split('_') field = int(tokens[4]) axis = tokens[5] return field, axis
[docs]def parse_database_prop(prop): """ Parse a property name and return the information from it if it is a VOTCA database property :param str prop: The property to check :rtype: (int, str) or None :return: The integer is the field index, the string is the charge (HOLE or ELECTRON). None is returned if the property is not a database property. """ if is_votca_prop(prop) != DATABASE_TYPE: return None tokens = prop.replace(VOTCA_SPROP_START, "").split('_') field = int(tokens[0]) charge = tokens[1] return field, charge
[docs]class AxisData(object): """ Holds data that differs on the X, Y and Z axes """
[docs] def __init__(self): """ Create an AxisData object """ self.components = [0] * 3
[docs] def setComponent(self, axis, value): """ Set the data for one axis :type axis: str or int :param axis: Either the capital name of an axis (X, Y, Z) or the numerical index of that axis """ self.components[AXIS_INDEX[axis]] = value
[docs]class SQLCursor(object): """ Context manager for reading or modifying an SQL database. Ensures that changes are commited and the cursor/connection are closed when finished.:: cmd = "black SQL magic" with SQLCursor(path_to_sql_file) as cursor: cursor.execute(cmd) """
[docs] def __init__(self, path): self.path = str(path) self.connection = None self.cursor = None
def __enter__(self): self.connection = sqlite3.connect(self.path) self.cursor = self.connection.cursor() self.cursor.row_factory = sqlite3.Row return self.cursor def __exit__(self, *args): self.connection.commit() self.cursor.close() self.connection.close()
[docs]class Table(object): """ Base table class for VOTCA SQL tables """ TABLE_NAME = "" COLUMNS = OrderedDict() CREATION_COMMAND = 'CREATE TABLE {name} ({columns});' ADDROW_COMMAND = 'INSERT INTO {name} ({cols}) VALUES ({ph})' NULL_ALLOWED = False # Column names SQL_ID = '_id' SQID = 'id' FRAME = 'frame' TOP = 'top' NAME = 'name' TYPE = 'type' MOL = 'mol' SEG = 'seg' POSX = 'posX' POSY = 'posY' POSZ = 'posZ'
[docs] def __init__(self, filename): """ Create a Table instance :type filename: str :param filename: The path to the SQL file """ self.filename = filename self.cursor = None
[docs] def setCursor(self, cursor): """ Set the SQL cursor this table should use :type cursor: sqlite3.Cursor :param cursor: The cursor to use for database read/writes """ self.cursor = cursor
[docs] def create(self): """ Create this table in the database :raise `SQLCreationError`: If the cursor is not defined """ if not self.cursor: SQLCreationError( f'Table {self.TABLE_NAME} cannot be created without a cursor.') # This means that a new _id will automatically be created for each new # row and the value of this will increase by one each time colinfo = [f'{self.SQL_ID} INTEGER PRIMARY KEY AUTOINCREMENT'] for name, data in self.COLUMNS.items(): if not self.NULL_ALLOWED and data.default is None: # Columns with no default will require a value value = 'NOT NULL' else: value = 'DEFAULT %s' % data.default colinfo.append('%s %s %s' % (name, data.type, value)) columns = ', '.join(colinfo) cmd = self.CREATION_COMMAND.format(name=self.TABLE_NAME, columns=columns) self.cursor.execute(cmd)
def _addRow(self, props): """ Add a row to this table :type props: dict :param props: Non-default row values. Keys are column names, values are the value for that column. :raise `SQLCreationError`: If the cursor is not defined """ if not self.cursor: SQLCreationError( f'Table {self.TABLE_NAME} cannot add a row without a cursor.') # Create a dict with default values newline_data = OrderedDict() for name, data in self.COLUMNS.items(): newline_data[name] = data.default # Update with caller-supplied values for name, value in props.items(): if name not in newline_data: raise KeyError('%s is not a value column name for %s' % (name, self.TABLE_NAME)) newline_data[name] = value if not self.NULL_ALLOWED: # Ensure that all required values are supplied for name, value in newline_data.items(): if value is None: raise ValueError('A value for %s must be supplied for %s' % (name, self.TABLE_NAME)) colnames = ', '.join(newline_data.keys()) placeholders = ', '.join(['?'] * len(newline_data)) cmd = self.ADDROW_COMMAND.format(name=self.TABLE_NAME, cols=colnames, ph=placeholders) self.cursor.execute(cmd, tuple(newline_data.values()))
[docs] def getMoleculeInfo(self, molecule, centroid=True): """ Get common database information for a molecule object :type molecule: `schrodinger.structure._StructureMolecule` :param molecule: The molecule object to get information for :type centroid: bool :param centroid: Include information about the molecule's centroid. If False, all centroid information will be 0. :rtype: `MoleculeData` :return: A MoleculeData object containing the information """ index = molecule.number if centroid: centroid = transform.get_centroid(molecule.structure, molecule.getAtomIndices()) else: centroid = [0, 0, 0] name = f'{self.getSegmentType(molecule)}_{molecule.number}' mtype = molecule.atom[1].pdbres # Positions are stored in nanometers return MoleculeData(index=index, name=name, mtype=mtype, posx=centroid[0] / 10., posy=centroid[1] / 10., posz=centroid[2] / 10.)
[docs] @staticmethod def getSegmentType(molecule): """ Get the segment type for this molecule Segment names will be type_X, where X is the molecule number :param `structure._Molecule` molecule: The molecule object :rtype: str :return: The segment type for this molecule """ return molecule.atom[1].pdbres.strip()
[docs] @staticmethod def getAllSegmentTypes(struct): """ Get all the segment types for this structure :param `structure.Structure` struct: The structure object :rtype: set :return: Each item of the set is the name of a segment type """ return {Table.getSegmentType(x) for x in struct.molecule}
[docs]class FramesTable(Table): """ The frames table """ TABLE_NAME = 'frames' # Column names TIME = 'time' STEP = 'step' BOX11 = 'box11' BOX12 = 'box12' BOX13 = 'box13' BOX21 = 'box21' BOX22 = 'box22' BOX23 = 'box23' BOX31 = 'box31' BOX32 = 'box32' BOX33 = 'box33' CANRIGID = 'canRigid' # yapf: disable COLUMNS = OrderedDict( [(Table.SQID, ColumnData(type=INT, default=0)), (TIME, ColumnData(type=REAL, default=0)), (STEP, ColumnData(type=INT, default=0)), (BOX11, ColumnData(type=REAL, default=None)), (BOX12, ColumnData(type=REAL, default=None)), (BOX13, ColumnData(type=REAL, default=None)), (BOX21, ColumnData(type=REAL, default=None)), (BOX22, ColumnData(type=REAL, default=None)), (BOX23, ColumnData(type=REAL, default=None)), (BOX31, ColumnData(type=REAL, default=None)), (BOX32, ColumnData(type=REAL, default=None)), (BOX33, ColumnData(type=REAL, default=None)), (CANRIGID, ColumnData(type=INT, default=0))]) # yapf: enable
[docs] def addRow(self, struct): """ Add a frame row to the table The main frame information is the PBC box :type struct: `schrodinger.structure.Structure` :param struct: The structure with the PBC information """ props = {} try: chorus = xtal.get_chorus_properties(struct) except KeyError as err: raise SQLCreationError( 'The given structure is missing a required PBC property:\n ' + str(err)) sqlbox = [ self.BOX11, self.BOX12, self.BOX13, self.BOX21, self.BOX22, self.BOX23, self.BOX31, self.BOX32, self.BOX33 ] for prop, val in zip(sqlbox, chorus): # Box size is stored in nanometers props[prop] = val / 10. self._addRow(props)
[docs]class MoleculesTable(Table): """ The molecules table""" TABLE_NAME = 'molecules' # yapf: disable COLUMNS = OrderedDict( [(Table.FRAME, ColumnData(type=INT, default=0)), (Table.TOP, ColumnData(type=INT, default=0)), (Table.SQID, ColumnData(type=INT, default=None)), (Table.NAME, ColumnData(type=TEXT, default=None)), (Table.TYPE, ColumnData(type=TEXT, default=None))]) # yapf: enable
[docs] def addRow(self, molecule): """ Add a row :type molecule: `schrodinger.structure._StructureMolecule` :param molecule: The molecule object to add a row for """ data = self.getMoleculeInfo(molecule, centroid=False) props = {} props[self.SQID] = data.index props[self.NAME] = data.name props[self.TYPE] = data.mtype self._addRow(props)
[docs]class SegmentsTable(Table): """ The segments table """ TABLE_NAME = 'segments' UNCNNE = 'UnCnNe' UNCNNH = 'UnCnNh' UCNCCE = 'UcNcCe' UCNCCH = 'UcNcCh' UCCNNE = 'UcCnNe' UCCNNH = 'UcCnNh' EANION = 'eAnion' ENEUTRAL = 'eNeutral' ECATION = 'eCation' HAS_E = 'has_e' HAS_H = 'has_h' OCCPE = 'occPe' OCCPH = 'occPh' # yapf: disable COLUMNS = OrderedDict( [(Table.FRAME, ColumnData(type=INT, default=0)), (Table.TOP, ColumnData(type=INT, default=0)), (Table.SQID, ColumnData(type=INT, default=None)), (Table.NAME, ColumnData(type=TEXT, default=None)), (Table.TYPE, ColumnData(type=TEXT, default=None)), (Table.MOL, ColumnData(type=INT, default=None)), (Table.POSX, ColumnData(type=REAL, default=None)), (Table.POSY, ColumnData(type=REAL, default=None)), (Table.POSZ, ColumnData(type=REAL, default=None)), (UNCNNE, ColumnData(type=REAL, default=0)), (UNCNNH, ColumnData(type=REAL, default=0)), (UCNCCE, ColumnData(type=REAL, default=0)), (UCNCCH, ColumnData(type=REAL, default=0)), (UCCNNE, ColumnData(type=REAL, default=0)), (UCCNNH, ColumnData(type=REAL, default=0)), (EANION, ColumnData(type=REAL, default=0)), (ENEUTRAL, ColumnData(type=REAL, default=0)), (ECATION, ColumnData(type=REAL, default=0)), (HAS_E, ColumnData(type=INT, default=0)), (HAS_H, ColumnData(type=INT, default=0)), (OCCPE, ColumnData(type=REAL, default=DEFAULT_OCCUPANCY)), (OCCPH, ColumnData(type=REAL, default=DEFAULT_OCCUPANCY))]) SITE_ENERGY_PROPS = {ELECTRON: [UCCNNE, UNCNNE, UCNCCE], HOLE: [UCCNNH, UNCNNH, UCNCCH]} # yapf: enable
[docs] def addRow(self, molecule, stypes): """ Add a row :type molecule: `schrodinger.structure._StructureMolecule` :param molecule: The molecule object to add a row for :type stypes: dict :param stypes: Kyes are segment names (atom pdbres names), values are the index of that segment """ data = self.getMoleculeInfo(molecule) props = {} props[self.SQID] = data.index props[self.NAME] = data.name props[self.TYPE] = stypes[self.getSegmentType(molecule)] props[self.MOL] = data.index props[self.POSX] = data.posx props[self.POSY] = data.posy props[self.POSZ] = data.posz self._addRow(props)
[docs]class SegmentTypesTable(Table): """ The segmentTypes table """ TABLE_NAME = 'segmentTypes' BASIS = 'basis' ORBFILE = 'orbfile' TORBNRS = 'torbnrs' COORDFILE = 'coordfile' CANRIGID = 'canRigid' # yapf: disable COLUMNS = OrderedDict( [(Table.FRAME, ColumnData(type=INT, default=0)), (Table.TOP, ColumnData(type=INT, default=0)), (Table.SQID, ColumnData(type=INT, default=None)), (Table.NAME, ColumnData(type=TEXT, default=None)), (BASIS, ColumnData(type=TEXT, default=SQL_NONAME)), (ORBFILE, ColumnData(type=TEXT, default=SQL_NOFILE)), (TORBNRS, ColumnData(type=TEXT, default=SQL_NOT_USED)), (COORDFILE, ColumnData(type=TEXT, default=SQL_NOFILE)), (CANRIGID, ColumnData(type=INT, default=0))]) # yapf: enable
[docs] def addRow(self, stype, index): """ Add a row :type stype: str :param stype: The segment type name (should be an atom.pdbres name) :type index: int :param index: The segment type index """ props = {} props[self.SQID] = index props[self.NAME] = stype self._addRow(props)
[docs]class FragmentsTable(Table): """ The fragments table """ TABLE_NAME = 'fragments' SYMMETRY = 'symmetry' LEG1 = 'leg1' LEG2 = 'leg2' LEG3 = 'leg3' # yapf: disable COLUMNS = OrderedDict( [(Table.FRAME, ColumnData(type=INT, default=0)), (Table.TOP, ColumnData(type=INT, default=0)), (Table.SQID, ColumnData(type=INT, default=None)), (Table.NAME, ColumnData(type=TEXT, default=None)), (Table.TYPE, ColumnData(type=TEXT, default=None)), (Table.MOL, ColumnData(type=INT, default=None)), (Table.SEG, ColumnData(type=INT, default=None)), (Table.POSX, ColumnData(type=REAL, default=None)), (Table.POSY, ColumnData(type=REAL, default=None)), (Table.POSZ, ColumnData(type=REAL, default=None)), (SYMMETRY, ColumnData(type=INT, default=-1)), (LEG1, ColumnData(type=INT, default=1)), (LEG2, ColumnData(type=INT, default=2)), (LEG3, ColumnData(type=INT, default=3))]) # yapf: enable
[docs] def addRow(self, molecule): """ Add a row :type molecule: `schrodinger.structure._StructureMolecule` :param molecule: The molecule object to add a row for """ data = self.getMoleculeInfo(molecule) props = {} props[self.SQID] = data.index props[self.NAME] = data.name props[self.TYPE] = data.mtype props[self.MOL] = data.index props[self.SEG] = data.index props[self.POSX] = data.posx props[self.POSY] = data.posy props[self.POSZ] = data.posz self._addRow(props)
[docs]class AtomsTable(Table): """ The atoms table """ TABLE_NAME = 'atoms' FRAG = 'frag' RESNR = 'resnr' RESNAME = 'resname' WEIGHT = 'weight' ELEMENT = 'element' QMID = 'qmid' QMPOSX = 'qmPosX' QMPOSY = 'qmPosY' QMPOSZ = 'qmPosZ' # yapf: disable COLUMNS = OrderedDict( [(Table.FRAME, ColumnData(type=INT, default=0)), (Table.TOP, ColumnData(type=INT, default=0)), (Table.SQID, ColumnData(type=INT, default=None)), (Table.NAME, ColumnData(type=TEXT, default=None)), (Table.TYPE, ColumnData(type=INT, default=None)), (Table.MOL, ColumnData(type=INT, default=None)), (Table.SEG, ColumnData(type=INT, default=None)), (FRAG, ColumnData(type=INT, default=None)), (RESNR, ColumnData(type=INT, default=1)), (RESNAME, ColumnData(type=TEXT, default=None)), (Table.POSX, ColumnData(type=REAL, default=None)), (Table.POSY, ColumnData(type=REAL, default=None)), (Table.POSZ, ColumnData(type=REAL, default=None)), (WEIGHT, ColumnData(type=REAL, default=None)), (ELEMENT, ColumnData(type=TEXT, default=None)), (QMID, ColumnData(type=INT, default=0)), (QMPOSX, ColumnData(type=REAL, default=0.0)), (QMPOSY, ColumnData(type=REAL, default=0.0)), (QMPOSZ, ColumnData(type=REAL, default=0.0))]) # yapf: enable
[docs] def addRow(self, atom): """ Add a row :type atom: `structure.Structure._StructureAtom` :param atom: The atom to add a row for """ props = {} props[self.SQID] = atom.index props[self.NAME] = atom.pdbname props[self.TYPE] = atom.pdbname props[self.MOL] = atom.molecule_number props[self.SEG] = atom.molecule_number props[self.FRAG] = atom.molecule_number props[self.RESNAME] = atom.pdbres # Atom position is stored in nanometers props[self.POSX] = atom.x / 10 props[self.POSY] = atom.y / 10 props[self.POSZ] = atom.z / 10 props[self.WEIGHT] = atom.atomic_weight props[self.ELEMENT] = atom.element self._addRow(props)
[docs]class PairsTable(Table): """ The pairs table """ TABLE_NAME = 'pairs' # The molecule numbers of the two segments involved in a dimer SEG1 = 'seg1' SEG2 = 'seg2' # Delta X, Y and Z distance between two molecules in a dimer DRX = 'drx' DRY = 'dry' DRZ = 'drz' LOE = 'lOe' LOH = 'lOh' HAS_E = 'has_e' HAS_H = 'has_h' RATE12E = 'rate12e' RATE21E = 'rate21e' RATE12H = 'rate12h' RATE21H = 'rate21h' JEFF2E = 'Jeff2e' JEFF2H = 'Jeff2h' # yapf: disable COLUMNS = OrderedDict( [(Table.FRAME, ColumnData(type=INT, default=0)), (Table.TOP, ColumnData(type=INT, default=0)), (Table.SQID, ColumnData(type=INT, default=None)), (SEG1, ColumnData(type=INT, default=None)), (SEG2, ColumnData(type=INT, default=None)), (DRX, ColumnData(type=REAL, default=None)), (DRY, ColumnData(type=REAL, default=None)), (DRZ, ColumnData(type=REAL, default=None)), (LOE, ColumnData(type=REAL, default=0)), (LOH, ColumnData(type=REAL, default=0)), (HAS_E, ColumnData(type=INT, default=0)), (HAS_H, ColumnData(type=INT, default=0)), (RATE12E, ColumnData(type=REAL, default=0)), (RATE21E, ColumnData(type=REAL, default=0)), (RATE12H, ColumnData(type=REAL, default=0)), (RATE21H, ColumnData(type=REAL, default=0)), (JEFF2E, ColumnData(type=REAL, default=0)), (JEFF2H, ColumnData(type=REAL, default=0)), (Table.TYPE, ColumnData(type=INT, default=0))]) # yapf: enable
[docs] def addRow(self, index, dimer): """ Add a row to the table :type index: int :param index: The index of this pair :type dimer: `schrodinger.application.matsci.clusterstruct.Dimer` :param dimer: The Dimer object for this row """ mol1, mol2 = sorted(dimer.molnumbers) atom_a = dimer.neighbor_info.home_atom atom_b = dimer.neighbor_info.neighbor_atom coords = atom_a.xyz + atom_b.xyz # Must make sure the PBC is accounted for dx, dy, dz = dimer.pbc.getShortestVector(*coords) props = {} # Pair ID props[self.SQID] = index # Molecules involved props[self.SEG1] = mol1 props[self.SEG2] = mol2 # Delta coordinates for the closest approach between the molecules, # values are in NM props[self.DRX] = dx / 10. props[self.DRY] = dy / 10. props[self.DRZ] = dz / 10. self._addRow(props)
[docs] @classmethod def setRowProperty(cls, mols, prop, value, cursor): """ Set the value of a property in the row for the given pair of molecules :param iterable mols: The two mol numbers involved in this pair :param str prop: The name of property column to set :param any value: The value to set for the property :param `sqlite3.Cursor` cursor: The cursor to use :raise RuntimeError: If value has spaces """ try: if "" in value: raise RuntimeError('Values with spaces are not allowed') except TypeError: pass mol1, mol2 = sorted(mols) cursor.execute(f'UPDATE {cls.TABLE_NAME} SET {prop} = {value} ' f'WHERE {cls.SEG1} = {mol1} AND {cls.SEG2} = {mol2}')
[docs]class SuperExchangeTable(Table): """ The superExchange table Note: unused """ TABLE_NAME = 'superExchange' COLUMNS = OrderedDict([(Table.FRAME, ColumnData(type=INT, default=0)), (Table.TOP, ColumnData(type=INT, default=0)), (Table.TYPE, ColumnData(type=TEXT, default=None))])
[docs]class SchrodingerTable(Table): """ The schrodinger table. This contains Schrodinger-specific information and is not used by VOTCA """ TABLE_NAME = 'schrodinger' NULL_ALLOWED = True MOLFORM = 'mol_formula' VOLUME = 'pbc_volume_Ang3' JOBID = 'jobid' STRUCTURE_PATH = 'structure_path' STRUCTURE_FILE = 'structure_file' PAIR_DISTANCE = 'pair_distance_Ang' PAIR_TYPE = 'pair_type' VERSION = 'version' # Note - stopped using JUMPFILE in 20-1 JUMPFILE = 'jumpfile' JUMPSUMMARY = 'jumpsummary' RUNTIME = 'runtime' SEED = 'seed' FIELDX = 'fieldX' FIELDY = 'fieldY' FIELDZ = 'fieldZ' TEMPERATURE = 'temperature' RATEFILE = 'ratefile' CARRIERTYPE = 'carriertype' SITE_KEYWORDS = 'site_keywords' FIX_SITE_KEYWORDS = 'site_fix_keywords' # yapf: disable COLUMNS = OrderedDict( [(MOLFORM, ColumnData(type=TEXT, default=None)), (VOLUME, ColumnData(type=REAL, default=0.0)), (JOBID, ColumnData(type=TEXT, default=None)), (STRUCTURE_PATH, ColumnData(type=TEXT, default=None)), (STRUCTURE_FILE, ColumnData(type=TEXT, default=None)), (PAIR_DISTANCE, ColumnData(type=REAL, default=0.0)), (PAIR_TYPE, ColumnData(type=REAL, default=None)), (VERSION, ColumnData(type=REAL, default=DB_VERSION)), (JUMPSUMMARY, ColumnData(type=TEXT, default=None)), (RUNTIME, ColumnData(type=REAL, default=None)), (SEED, ColumnData(type=INT, default=None)), (FIELDX, ColumnData(type=REAL, default=None)), (FIELDY, ColumnData(type=REAL, default=None)), (FIELDZ, ColumnData(type=REAL, default=None)), (TEMPERATURE, ColumnData(type=REAL, default=None)), (RATEFILE, ColumnData(type=TEXT, default=None)), (CARRIERTYPE, ColumnData(type=TEXT, default=None)), (SITE_KEYWORDS, ColumnData(type=TEXT, default=None)), (FIX_SITE_KEYWORDS, ColumnData(type=TEXT, default=None))]) # yapf: enable
[docs] def addRow(self, struct): """ Add a row :type struct: `schrodinger.structure.Structure` :param struct: The structure for the database """ props = {} # Molecular formula props[self.MOLFORM] = analyze.generate_molecular_formula(struct) # Box volumne try: box = cms.get_box(struct) except KeyError: volume = 0.0 else: volume = cms.get_boxvolume(box) props[self.VOLUME] = volume self._addRow(props)
[docs]class DatabaseManager(object): """ Manage initialization and filling of SQL database tables """
[docs] class Cursor(SQLCursor): """ Context manager for obtaining a cursor object for use by the tables. Note that when adding many rows it saves a huge amount of time to create the cursor once and then close it when finished rather than create/close a cursor for each row. """
[docs] def __init__(self, manager): self.manager = manager super().__init__(manager.path)
def __enter__(self): cursor = super().__enter__() for table in self.manager.tables.values(): table.setCursor(cursor) return self.cursor def __exit__(self, *args): for table in self.manager.tables.values(): table.setCursor(None) super().__exit__()
TABLE_CLASSES = (FramesTable, PairsTable, MoleculesTable, SegmentsTable, FragmentsTable, AtomsTable, SegmentTypesTable, SchrodingerTable, SuperExchangeTable)
[docs] def __init__(self, struct, filename): """ Create a DatabaseManager instance :type struct: `schrodinger.structure.Structure` :param struct: The structure to find dimers in :type filename: str :param filename: The name of the SQL file to create """ self.struct = struct self.path = filename self.tables = {x.TABLE_NAME: x(self.path) for x in self.TABLE_CLASSES} self.segment_types = {}
[docs] def initializeDatabase(self): """ Create all the tables and fill all but the pairs table with initial data """ with self.Cursor(self): for table in self.tables.values(): table.create() self.fillAtoms() self.fillFragments() self.fillFrames() self.fillMolecules() self.fillSegmentTypes() self.fillSegments() self.fillSchrodinger()
[docs] def fillAtoms(self): """ Fill the atoms table """ # Atoms must have a name for mol in self.struct.molecule: for atom in mol.atom: if not atom.pdbname.strip(): atom.pdbname = atom.element + str(atom.number_by_molecule) table = self.tables[AtomsTable.TABLE_NAME] for atom in self.struct.atom: table.addRow(atom)
[docs] def fillFragments(self): """ Fill the fragments table """ table = self.tables[FragmentsTable.TABLE_NAME] for mol in self.struct.molecule: table.addRow(mol)
[docs] def fillFrames(self): """ Fill the frames table """ table = self.tables[FramesTable.TABLE_NAME] table.addRow(self.struct)
[docs] def fillMolecules(self): """ Fill the molecules table """ table = self.tables[MoleculesTable.TABLE_NAME] for mol in self.struct.molecule: table.addRow(mol)
[docs] def fillSegmentTypes(self): """ Fill the segmentTypes table """ for mol in self.struct.molecule: resname = Table.getSegmentType(mol) if resname not in self.segment_types: self.segment_types[resname] = len(self.segment_types) + 1 table = self.tables[SegmentTypesTable.TABLE_NAME] for stype, index in self.segment_types.items(): table.addRow(stype, index)
[docs] def fillSegments(self): """ Fill the segments table :raise RuntimeError: If fillSegmentTypes has not been called yet """ if not self.segment_types: raise RuntimeError('fillSegmentTypes must be called before fill ' 'Segments') table = self.tables[SegmentsTable.TABLE_NAME] for mol in self.struct.molecule: table.addRow(mol, self.segment_types)
[docs] def fillSchrodinger(self): """ Fill the schrodinger table """ table = self.tables[SchrodingerTable.TABLE_NAME] table.addRow(self.struct)
[docs] def fillPairs(self, dist, pair_type=SQL_HEAVY): """ Find all dimers in the given structure based on the normal Schrodinger dimer finding algorithm. Add all found dimers to the given VOTCA SQL file. :type dist: float :param dist: The distance threshold for defining dimers :type pair_type: str :param pair_type: Either SQL_HEAVY (heavy atom distances only) or SQL_ALL (all atoms are considered when determining pair distance) :rtype: int :return: The number of dimers found """ if pair_type != SQL_HEAVY and pair_type != SQL_ALL: raise ValueError('pair_type must be SQL_HEAVY or SQL_ALL') heavy_only = pair_type == SQL_HEAVY dimers = clusterstruct.get_dimers_in_structure(self.struct, distance=dist, heavy_only=heavy_only) # Fill the database table = self.tables[PairsTable.TABLE_NAME] with self.Cursor(self): for index, dimer in enumerate(dimers, 1): table.addRow(index, dimer) set_schrodinger_db_value(self.path, SchrodingerTable.PAIR_TYPE, pair_type) set_schrodinger_db_value(self.path, SchrodingerTable.PAIR_DISTANCE, dist) return len(dimers)
[docs]def sql_command(cursor, cmd): """ Perform the given command without closing the cursor or saving the results to the database :type cursor: sqlite3.Cursor :param cursor: The cursor used :type cmd: str :param cmd: The SQL command to perform :rtype: bool :return: True if the command executed, False if the command raised a no such table error :raise sqlite3.OperationalError: in unknown circumstances """ try: cursor.execute(cmd) except sqlite3.OperationalError as msg: if is_no_table_error(msg): # This database has no such table return False else: # Unknown condition, raise it raise return True
[docs]def table_rows(db_path, table, orderby=None): """ Generator for all the rows in a specific table of the database :type db_path: str or pathlib.Path :param db_path: The path to the database :type table: str :param table: The name of the table to get the rows for :rtype: sqlite3.Row :return: Yields each row in the table """ with SQLCursor(db_path) as cursor: cmd = f'SELECT * FROM {table}' if orderby: cmd += f' ORDER BY {orderby}' if not sql_command(cursor, cmd): return for row in cursor.fetchall(): yield row
[docs]def delete_all_rows(db_path, table): """ Delete all the rows in this table :type db_path: str or pathlib.Path :param db_path: The path to the database :type table: str :param table: The name of the table to get the rows for """ with SQLCursor(db_path) as cursor: sql_command(cursor, f'DELETE FROM {table}')
[docs]def is_no_table_error(exc): """ Detect if this exception is due to the requested table not existing :type exc: Exception :param exc: The Exception to check :rtype: bool :return: Whether this exception is for a missing table """ return 'no such table' in str(exc)
[docs]def add_schrodinger_column(db_path, name): """ Add a column to the schrodinger table. This may be needed if the SQL file was created with an older version that didn't include this column :type db_path: str or `pathlib.Path` :param db_path: The path to the SQL file :type name: str :param name: The name of the column, must be a key in SchrodingerTable.COLUMNS """ data = SchrodingerTable.COLUMNS[name] # ALTER TABLE schrodinger ADD COLUMN bob REAL DEFAULT '0.0' cmd = (f"ALTER TABLE {SchrodingerTable.TABLE_NAME} ADD COLUMN " f"'{name}' {data.type} DEFAULT '{data.default}'") with SQLCursor(db_path) as cursor: cursor.execute(cmd)
[docs]def set_schrodinger_db_value(db_path, name, value): """ Set the value of the given column in the Schrodinger table. :type db_path: str or `pathlib.Path` :param db_path: The path to the SQL file :type name: str :param name: The name of the column, must be a key in SchrodingerTable.COLUMNS :param value: The value to put into the database. The type of the parameter should be consistent with the expected type for that column. """ with SQLCursor(db_path) as cursor: # UPDATE schrodinger SET bob='0.0' cmd = f"UPDATE {SchrodingerTable.TABLE_NAME} SET {name}='{value}'" try: cursor.execute(cmd) except sqlite3.OperationalError as msg: if 'no such column' in str(msg): # An old version of the database that pre-dates this column. Add # the column. add_schrodinger_column(db_path, name) cursor.execute(cmd) else: # Unknown case, let's see the error raise
[docs]def store_schrodinger_job_props(db_path, mae_name, struct): """ Store Schrodinger information about the current job in the database :type db_path: str or `pathlib.Path` :param db_path: The path to the SQL file :type mae_name: str :param mae_name: The name of the Maestro file that will hold the structure :type struct: `schrodinger.structure.Structure` :param struct: The structure to add corresponding job info props to """ backend = jobcontrol.get_backend() stable = SchrodingerTable if backend: job = backend.getJob() set_schrodinger_db_value(db_path, stable.JOBID, job.JobId) id_prop = VOTCA_JOB_ID struct.property[id_prop] = job.JobId set_schrodinger_db_value(db_path, stable.STRUCTURE_PATH, job.OrigLaunchDir) else: set_schrodinger_db_value(db_path, stable.JOBID, SQL_NOJOB) set_schrodinger_db_value(db_path, stable.STRUCTURE_PATH, os.getcwd()) set_schrodinger_db_value(db_path, stable.STRUCTURE_FILE, mae_name)
[docs]def get_schrodinger_db_value(db_path, name): """ Get the value for the given column from the Schrodinger table in the database :type db_path: str or `pathlib.Path` :param db_path: The path to the SQL file :type name: str :param name: The name of the column to get the data from :rtype: variable or None :return: The value for the given column in the Schrodinger table, or None if no such table exists or no such column exists """ for row in table_rows(db_path, SchrodingerTable.TABLE_NAME): try: # Much like the Highlander, there should be only one Schrodinger row value = row[name] except IndexError: # This row has no information for the requested name pass else: # Backwards compatibility for 'none' values in Schrodinger table # MATSCI-11011 if value == SQL_DEPRECATED_NONE: value = None return value return None
[docs]def get_db_structure_path(db_path, existence_check=True): """ Get the path to the structure that created this database :type db_path: str or `pathlib.Path` :param db_path: The path to the SQL file :type existence_check: bool :param existence_check: If True, return None if the path in the database does not point to an existing file. If False, return the path regardless of whether the file exists. :rtype: pathlib.Path or None :return: The Path to the structure file, or None if no path is found in the database or existence_check=True and the file does not exist """ path = get_schrodinger_db_value(db_path, SchrodingerTable.STRUCTURE_PATH) fname = get_schrodinger_db_value(db_path, SchrodingerTable.STRUCTURE_FILE) def check_path(path, fname): """ Check to see if the expected structure file exists in the path directory :type path: str or `pathlib.Path` :param path: The path to the directory maybe holding the structure file :type fname: str :param fname: The name of the desired structure file :rtype: pathlib.Path or None :return: An existing path or None if not exists """ path = pathlib.Path(path) full_path = path / fname if not existence_check or full_path.exists(): return full_path return None if path: valid_path = check_path(path, fname) if valid_path: return valid_path if db_path: valid_path = check_path(os.path.dirname(db_path), fname) if valid_path: return valid_path return None
[docs]def add_pairs_to_database(struct, path, dist, pair_type=SQL_HEAVY): """ Find all dimers in the given structure based on the normal Schrodinger dimer finding algorithm. Add all found dimers to the given VOTCA SQL file. :type struct: `schrodinger.structure.Structure` :param struct: The structure with the pairs :type path: str :param path: The path to the SQL database :type dist: float :param dist: The distance threshold for defining dimers :type pair_type: str :param pair_type: Either SQL_HEAVY (heavy atom distances only) or SQL_ALL (all atoms are considered when determining pair distance) :rtype: int :return: The number of dimers found """ manager = DatabaseManager(struct, path) return manager.fillPairs(dist, pair_type=pair_type)
[docs]def get_pairs_from_database(db_path): """ Get the pairs from the database :type db_path: str or `pathlib.Path` :param db_path: The path to the SQL file :rtype: list :return: Each item of the list is a `schrodinger.application.matsci.clusterstruct.Dimer` object. The list is empty if the pairs table has not been populated. Note that the Dimer objects will not have set the home_atom or neighbor_atom properties of the neighbor_info property. """ dimers = [] for row in table_rows(db_path, PairsTable.TABLE_NAME): mol1 = row[PairsTable.SEG1] mol2 = row[PairsTable.SEG2] dx = row[PairsTable.DRX] dy = row[PairsTable.DRY] dz = row[PairsTable.DRZ] distsq = dx * dx + dy * dy + dz * dz info = clusterstruct.Neighbor(home_atom=None, neighbor_atom=None, dsq=distsq) dimers.append(clusterstruct.Dimer(0, mol1, mol2, info)) return dimers
[docs]def has_pair_data(db_path): """ Check if the database has pair data :type db_path: str or pathlib.Path :param db_path: The path to the database :rtype: bool or str :return: If no data, False. If data, the distance type used to find pairs - either SQL_HEAVY or SQL_ALL """ ptype = get_schrodinger_db_value(db_path, SchrodingerTable.PAIR_TYPE) if ptype is None: return False else: return ptype
[docs]def get_pair_info(db_path): """ Get the parameters used to determine the existing pairs in the database :type db_path: str or pathlib.Path :param db_path: The path to the database :rtype: (str, float) or None :return: The type of distance used to find pairs (SQL_HEAVY or SQL_ALL) and the distance cutoff for pairs. None is returned if no pair data exists. """ ptype = has_pair_data(db_path) if not ptype: return None dist = get_schrodinger_db_value(db_path, SchrodingerTable.PAIR_DISTANCE) return ptype, dist
[docs]def find_missing_coupling_data(path, charge): """ Find any pair coupings that are 0 :param str path: The path to the database file :param str charge: Either `HOLE` or `ELECTRON` :rtype: list :return: Each item is a tuple with the molecule numbers of the two molecules involved in the missing coupling term. """ ptab = PairsTable prop = JEFF + CHARGE_ENDINGS[charge] missing = [] for row in table_rows(path, ptab.TABLE_NAME, orderby=ptab.SQID): if row[prop] == 0.0: missing.append((row[ptab.SEG1], row[ptab.SEG2])) return missing
[docs]def find_missing_site_energies(path, charge): """ Find any segment that has any site energy property equal to 0 :param str path: The path to the database file :param str charge: Either `HOLE` or `ELECTRON` :rtype: list :return: Each item is the integer SQID (which translates to molecule number) of any segment with missing site energy information """ missing = [] stab = SegmentsTable props = stab.SITE_ENERGY_PROPS[charge] for row in table_rows(path, stab.TABLE_NAME, orderby=stab.SQID): if any(row[x] == 0.0 for x in props): missing.append(row[stab.SQID]) return missing
[docs]def copy_sql_data(source, destination, table, columns): """ Copy the column from table in the source database to the destination database :type source: str or pathlib.path :param source: the path to the source database :type destination: str or pathlib.path :param destination: the path to the destination database :type table: str :param table: The name of the table to copy from :type columns: list :param columns: A list of column names to copy :raise IndexError: If the two databases do not have the same number of rows """ source_num = len(list(table_rows(source, table))) dest_num = len(list(table_rows(destination, table))) if source_num != dest_num: raise IndexError('Cannot copy data because the source database has ' f'{source_num} rows but the destination database has ' f'{dest_num} rows.') with SQLCursor(destination) as cursor: for row in table_rows(source, table): row_id = row[Table.SQID] coldata = ' , '.join(['%s = %s' % (x, row[x]) for x in columns]) cmd = ( f'UPDATE {table} SET {coldata} WHERE {Table.SQID} = {row_id}') cursor.execute(cmd)
[docs]class SQLCreationError(Exception): """ Raised if an issue occurs when creating the database """
[docs]def generate_votca_database(struct, backend=None): """ Create a VOTCA SQL database and include data on species and pairs Note that VOTCA requires the segments in a system be ordered such that all segments of the same type appear together in order. This function returns the reordered structure used to create the database. (segments=molecules) :type struct: `schrodinger.structure.Structure` :param struct: The structure to create a database for :type backend: `scschrodigner.job.jobcontrol._Backend` :param backend: The backend if one exists :rtype: str :return: The name of the sql file that was created :raise SQLCreationError: If the sql file can't be created """ if backend: basename = backend.getJob().Name else: basename = jobutils.clean_string(struct.title, default='votca_input') sqlname = basename + '.sql' manager = DatabaseManager(struct, sqlname) manager.initializeDatabase() if backend: backend.addOutputFile(sqlname) return sqlname