Source code for schrodinger.application.matsci.mecp_mod

"""
Classes and functions for finding MECPs.

Copyright Schrodinger, LLC. All rights reserved."""

import argparse
import math
import os
import random
from collections import OrderedDict
from past.utils import old_div

import numpy
import scipy.constants
import scipy.optimize

from schrodinger import structure
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.jaguar.output import restart_name
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import textlogger
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.structutils import transform
from schrodinger.utils import cmdline

DEFAULT_CHARGE_1 = 0
DEFAULT_CHARGE_2 = 0
DEFAULT_MULTIP_1 = 3
DEFAULT_MULTIP_2 = 1
DEFAULT_STATE_1 = 0
DEFAULT_STATE_2 = 1
DEFAULT_DATA = [(DEFAULT_CHARGE_1, DEFAULT_MULTIP_1, DEFAULT_STATE_1), \
    (DEFAULT_CHARGE_2, DEFAULT_MULTIP_2, DEFAULT_STATE_2)]

DEFAULT_SCF_GS = False

DEFAULT_GS_KWARGS = OrderedDict([('iuhf', 2), ('dftname', 'B3LYP'),
                                 ('basis', 'MIDIX'), ('isymm', 0),
                                 ('maxit', 100), ('nofail', 0), ('nops', 1),
                                 ('iacc', 1)])
DEFAULT_ES_KWARGS = OrderedDict([('itddft', 1), ('itda', 1)])
DEFAULT_KWARGS = OrderedDict(DEFAULT_GS_KWARGS)
DEFAULT_KWARGS.update(DEFAULT_ES_KWARGS)
RESERVED_JAGUAR_KEYS = ['multip', 'molchg', 'igeopt', 'itddft', \
    'nroot', 'target', 'rsinglet', 'rtriplet']
DEFAULT_JAGUAR_OPTIONS = \
    ['%s=%s' % pair for pair in DEFAULT_KWARGS.items()]

DEFAULT_ITERATIONS = 50

DEFAULT_EPS = 0.0001

IDENTITY_HESS = 'identity'
DIAG_FWD_FIN_DIFF_GRAD_HESS = 'diag_fwd_fin_diff_grad'
HESS_CHOICES = [IDENTITY_HESS, DIAG_FWD_FIN_DIFF_GRAD_HESS]
DEFAULT_HESS = IDENTITY_HESS

PROJECTION = 'projection'
UPDATING = 'updating'
METHOD_CHOICES = [PROJECTION, UPDATING]
DEFAULT_METHOD = PROJECTION

DEFAULT_PERP_FACTOR = 100.0
DEFAULT_PARA_FACTOR = 1.0

DELTA_ENERGY = 'delta_energy'
MAX_FORCE = 'max_force'
RMS_FORCE = 'rms_force'
MAX_DISPLACEMENT = 'max_displacement'
RMS_DISPLACEMENT = 'rms_displacement'
DEFAULT_DELTA_ENERGY = -1.0E-6
DEFAULT_MAX_FORCE = 1.0E-4
DEFAULT_RMS_FORCE = 1.0E-4
DEFAULT_MAX_DISPLACEMENT = 1.0E-5
DEFAULT_RMS_DISPLACEMENT = 1.0E-5
DEFAULT_CONVERGENCE_PAIRS = [(DELTA_ENERGY, DEFAULT_DELTA_ENERGY),
                             (MAX_FORCE, DEFAULT_MAX_FORCE),
                             (RMS_FORCE, DEFAULT_RMS_FORCE),
                             (MAX_DISPLACEMENT, DEFAULT_MAX_DISPLACEMENT),
                             (RMS_DISPLACEMENT, DEFAULT_RMS_DISPLACEMENT)]
DEFAULT_CONVERGENCE_DICT = OrderedDict(DEFAULT_CONVERGENCE_PAIRS)
DEFAULT_CONVERGENCE = \
    msutils.keyword_dict_to_string(DEFAULT_CONVERGENCE_DICT).split()

INPUT = 'input'
OPT_1 = 'opt_1'
OPT_2 = 'opt_2'
GUESS_GEOM_CHOICES = [INPUT, OPT_1, OPT_2]
DEFAULT_GUESS_GEOM = INPUT

DEFAULT_ALL_GEOMETRIES = False
DEFAULT_ALL_RESULTS = False
DEFAULT_PROPERTIES = False
DEFAULT_BASE_NAME = 'mecp'
DEFAULT_VERBOSE = False

LOCAL_HOST = 'localhost'
DEFAULT_HOST = LOCAL_HOST
DEFAULT_TPP = 1
DEFAULT_NPROC = 2
DEFAULT_NRESOURCES = DEFAULT_TPP * DEFAULT_NPROC

JOB_DJ_RETRIES = 0

MAE_EXT = '.mae'
IN_EXT = '.in'
OUT_EXT = '.out'
ALL_FILE_SUFFIX = '_out_all' + MAE_EXT
OUT_FILE_SUFFIX = '_out' + MAE_EXT
STATE_OPTIMIZATIONS = 'state_optimizations'

MSG_WIDTH = 100
DEFAULT_GEOMETRY_HEADER = 'Geometry / Ang.'
DEFAULT_DISPLACEMENTS_HEADER = 'Displacements / Ang.'
DEFAULT_ENERGY_HEADER = 'Energy / Hartree'
DEFAULT_FORCES_HEADER = 'Forces / Hartree/Ang.'

TITLE_KEY = 's_m_title'
MECP_ENERGY_1_KEY = 'r_matsci_E_MECP(State_1)/Hartree'
MECP_ENERGY_2_KEY = 'r_matsci_E_MECP(State_2)/Hartree'
BARRIER_KEY = 'r_matsci_%s_barrier(State_%s)/kcal/mol'
BARRIER_1_KEY = BARRIER_KEY % ('Forward', '1')
BARRIER_2_KEY = BARRIER_KEY % ('Reverse', '2')

DEFAULT_C1 = 1.0E-4
DEFAULT_C2 = 0.9
DEFAULT_MAX_STEP_SIZE = 50.0
DEFAULT_MIN_STEP_SIZE = 1.0E-8
DEFAULT_XTOL = 1.0E-14

# the following functions are not used in the MECP calculation
# but rather included as debug functions, to run with debug functions
# change DEBUG from None to a tuple of functions to call for states 1
# and 2, for example DEBUG = (rosen, rosen) or (quadratic_1, quadratic_2),
# the mocking is done at the deepest level to allow deep debugging, a typical
# input deck for a debug run looks like:
#
# $SCHRODINGER/run mecp.py -iterations 500 -verbose
# -input_file structure.mae
#


[docs]def rosen(x_vec): """ The Rosenbrock function. The target solution should be all ones and the function value here should be zero. :type x_vec: numpy.array :param x_vec: the solution vector (N X 1) :rtype: float, numpy.array :return: the function value and Jacobian (N X 1) """ return scipy.optimize.rosen(x_vec), scipy.optimize.rosen_der(x_vec)
[docs]def quadratic_1(x_vec): """ The x**2 + y**2 + 2*x function. The target solution is (x = -1, y = 0) with f = -1. The minimum crossing point with quadratic_2 is (x = -1/2, y = -1/2). :type x_vec: numpy.array :param x_vec: the solution vector (N X 1) :rtype: float, numpy.array :return: the function value and Jacobian (N X 1) """ x = x_vec[0] y = x_vec[1] energy = x**2 + y**2 + 2 * x gradient = numpy.zeros(len(x_vec)) gradient[0] = 2 * x + 2 gradient[1] = 2 * y return energy, gradient
[docs]def quadratic_2(x_vec): """ The x**2 + y**2 + 2*y function. The target solution is (x = 0, y = -1) with f = -1. The minimum crossing point with quadratic_1 is (x = -1/2, y = -1/2). :type x_vec: numpy.array :param x_vec: the solution vector (N X 1) :rtype: float, numpy.array :return: the function value and Jacobian (N X 1) """ x = x_vec[0] y = x_vec[1] energy = x**2 + y**2 + 2 * y gradient = numpy.zeros(len(x_vec)) gradient[0] = 2 * x gradient[1] = 2 * y + 2 return energy, gradient
[docs]def quadratic_3(x_vec): """ The x**2 + y**2 + 6*x function. The target solution is (x = -3, y = 0) with f = -9. The minimum crossing point with quadratic_2 is (x = -3/10, y = -9/10). :type x_vec: numpy.array :param x_vec: the solution vector (N X 1) :rtype: float, numpy.array :return: the function value and Jacobian (N X 1) """ x = x_vec[0] y = x_vec[1] energy = x**2 + y**2 + 6 * x gradient = numpy.zeros(len(x_vec)) gradient[0] = 2 * x + 6 gradient[1] = 2 * y return energy, gradient
[docs]def get_guess(nvar=33, amp=5, seed=123): """ Return a guess solution. :type nvar: int :param nvar: the number of variables :type amp: int :param amp: the amplitude of the set of random numbers used in generating the guess solution :type seed: int :param seed: seed for random :rtype: numpy.array :return: the guess geometry (N X 1) """ random.seed(seed) return numpy.array([amp * random.random() for i in range(nvar)])
[docs]def set_e_and_g(astructure, jobs): """ Set the energies and gradients for a debug run. :type astructure: `schrodinger.structure.Structure` :param astructure: the structure containing the solution as coordinates :type jobs: list :param jobs: contains instances of JaguarJob """ x_n = matrix_to_vector(astructure.getXYZ()) for job, func in zip(jobs, DEBUG): energy, gradient = func(x_n) forces = -1 * vector_to_matrix(gradient) job.debug_energy = energy job.debug_forces = forces job.jag_restart_input_file = None
DEBUG = None
[docs]class ParserWrapper(object): """ Manages the argparse module to parse user command line arguments. """
[docs] def __init__(self, scriptname, description): """ Create a ParserWrapper instance and process it. :type scriptname: str :param scriptname: name of this script :type description: str :param description: description of this script """ name = '$SCHRODINGER/run ' + scriptname self.parserobj = argparse.ArgumentParser( prog=name, description=description, add_help=False, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
[docs] def loadIt(self): """ Load ParserWrapper with options. """ charge_1_help = """Specify the net molecular charge of one of the two relevant states (refer to it as the first state although the order doesn't matter).""" self.parserobj.add_argument('-charge_1', type=int, default=DEFAULT_CHARGE_1, help=charge_1_help) charge_2_help = """Specify the net molecular charge of the other relevant state (refer to it as the second state although the order doesn't matter).""" self.parserobj.add_argument('-charge_2', type=int, default=DEFAULT_CHARGE_2, help=charge_2_help) multip_1_help = """Specify the multiplicity of the first state.""" self.parserobj.add_argument('-multip_1', type=int, default=DEFAULT_MULTIP_1, help=multip_1_help) multip_2_help = """Specify the multiplicity of the second state.""" self.parserobj.add_argument('-multip_2', type=int, default=DEFAULT_MULTIP_2, help=multip_2_help) state_1_help = """Specify the first state using 0 for the ground state and <N> for the N-th excited state. The indexing used for the excited states is the same as that used with the 'nroot' and 'target' &gen section keywords for excited state calculations with Jaguar.""" self.parserobj.add_argument('-state_1', type=int, default=DEFAULT_STATE_1, help=state_1_help) state_2_help = """Specify the second state using 0 for the ground state and <N> for the N-th excited state. The indexing used for the excited states is the same as that used with the 'nroot' and 'target' &gen section keywords for excited state calculations with Jaguar.""" self.parserobj.add_argument('-state_2', type=int, default=DEFAULT_STATE_2, help=state_2_help) scf_gs_help = """Specify that the ground state of states with non-singlet multiplicities be determined using an SCF rather than as an excited state using a ground state singlet reference state. Note that this option is currently automatically enabled for all multiplicities except triplets.""" self.parserobj.add_argument('-scf_gs', action='store_true', help=scf_gs_help) jaguar_options_help = """Specify Jaguar &gen section key-value pairs. Each key-value pair should be separated from the next by whitespace and each should be represented in terms of a '<key>=<value>' pair. If neither of the given states are excited states then the excited state options will be removed from the default Jaguar options given here. This option has a default value of: %s.""" metavar = ('<key_1>=<value_1>', '<key_2>=<value_2>') self.parserobj.add_argument('-jaguar_options', nargs='*', type=str, metavar=metavar, default=DEFAULT_JAGUAR_OPTIONS, help=jaguar_options_help % ' '.join(DEFAULT_JAGUAR_OPTIONS)) iterations_help = """Specify the maximum number of MECP geometry optimization iterations.""" self.parserobj.add_argument('-iterations', type=int, default=DEFAULT_ITERATIONS, help=iterations_help) eps_help = """Specify the step size, in units of Angstrom, to use in any finite difference approximations.""" self.parserobj.add_argument('-eps', type=float, default=DEFAULT_EPS, help=eps_help) init_hess_help = """Specify what to use for the initial guess Hessian used in the BFGS optimization. The choice %s is the identity matrix. The choice %s uses a diagonal forward finite difference of the MECP gradient. Finite difference approximations use the option -eps to specify the step size.""" self.parserobj.add_argument( '-init_hess', type=str, choices=HESS_CHOICES, default=DEFAULT_HESS, help=init_hess_help % (IDENTITY_HESS, DIAG_FWD_FIN_DIFF_GRAD_HESS)) method_help = """Specify the method to use for determining the MECP. The choice %s is the gradient projection method where the MECP gradient includes a term for the gradient difference vector plus a term in which the gradient difference vector is projected out of the gradient averaged over the two states. The choice %s is the updating branching plane method and is analogous to choice %s except that a vector that is both orthogonal to the gradient difference vector and on the branching plane is additionally projected out of the gradient averaged over the two states.""" self.parserobj.add_argument('-method', type=str, choices=METHOD_CHOICES, default=DEFAULT_METHOD, help=method_help % (PROJECTION, UPDATING, PROJECTION)) perp_factor_help = """Specify the prefactor, in units of inverse Hartree, of the energy term for which the gradient is perpendicular to the crossing seam.""" self.parserobj.add_argument('-perp_factor', type=float, default=DEFAULT_PERP_FACTOR, help=perp_factor_help) para_factor_help = """Specify the unitless prefactor of the energy term for which the gradient is parallel to the crossing seam. Note that setting this value to 0 reduces the MECP search to that of an arbitrary crossing point rather than a minimum energy crossing point. The arbitrary point may or may not be the minimum energy point as it will depend on the initial geometry.""" self.parserobj.add_argument('-para_factor', type=float, default=DEFAULT_PARA_FACTOR, help=para_factor_help) convergence_help = """Specify various convergence criteria for the MECP geometry optimization. Options include (1) %s, which specifies a target energy decrease between the k and k-1 iterations (default of %s Hartree), (2) %s, which specifies a target maximum magnitude element of the Cartesian forces matrix (default of %s Hartree/Ang.), (3) %s, which specifies a target RMS Cartesian force (default of %s Hartree/Ang.), (4) %s, which specifies a target maximum magnitude element of the Cartesian displacement matrix between the k and k-1 iterations (default of %s Ang.), and (5) %s, which specifies a target RMS Cartesian displacement (default of %s Ang.). Input should be whitespace separated <key>=<value> pairs. The MECP geometry optimization will terminate once any one of these conditions are met.""" self.parserobj.add_argument( '-convergence', nargs='*', type=str, default=DEFAULT_CONVERGENCE, help=convergence_help % (DELTA_ENERGY, DEFAULT_DELTA_ENERGY, MAX_FORCE, DEFAULT_MAX_FORCE, RMS_FORCE, DEFAULT_RMS_FORCE, MAX_DISPLACEMENT, DEFAULT_MAX_DISPLACEMENT, RMS_DISPLACEMENT, DEFAULT_RMS_DISPLACEMENT)) guess_geom_help = """Specify the initial geometry used in the MECP geometry optimization. The choices are %s, which is the input geometry, %s, which will do a geometry optimization for the first state and use its optimized geometry, and %s, which will do a geometry optimization for the second state and use its optimized geometry.""" self.parserobj.add_argument('-guess_geom', type=str, choices=GUESS_GEOM_CHOICES, default=DEFAULT_GUESS_GEOM, help=guess_geom_help % (INPUT, OPT_1, OPT_2)) all_geometries_help = """Use this option to report all geometries in the output from this script, i.e. the geometries from all steps of the MECP geometry optimization will be reported in the output file.""" self.parserobj.add_argument('-all_geometries', action='store_true', help=all_geometries_help) all_results_help = """Use this option to copy all subdirectories containing results from intermediate Jaguar force, etc. calculations back to the launch host.""" self.parserobj.add_argument('-all_results', action='store_true', help=all_results_help) properties_help = """Use this option to report properties on the final MECP structure. These properties include (1) and (2) the energy differences between the energies at the MECP less the energies at the optimized geometries of the two states.""" self.parserobj.add_argument('-properties', action='store_true', help=properties_help) base_name_help = """Specify a base name to use for naming job-related files, etc.""" self.parserobj.add_argument('-base_name', type=str, default=DEFAULT_BASE_NAME, help=base_name_help) input_file_help = """Specify a Maestro file containing a single input structure.""" self.parserobj.add_argument('-input_file', type=str, required=True, help=input_file_help) self.parserobj.add_argument('-verbose', action='store_true', help='Turn on verbose printing.') self.parserobj.add_argument('-help', '-h', action='help', default=argparse.SUPPRESS, help='Show this help message and exit.') jc_options = [cmdline.HOST, cmdline.NOJOBID, cmdline.SAVE, \ cmdline.WAIT] cmdline.add_jobcontrol_options(self.parserobj, options=jc_options) tpp_help = """Specify the number of threads to use for parallelizing any intermediate Jaguar calculations.""" self.parserobj.add_argument('-TPP', type=int, default=DEFAULT_TPP, dest='tpp', help=tpp_help) nresources_help = """Specify the total number of resources for Jaguar calculations, i.e. the number of threads (see option -TPP) * the number of processors. Here the number of processors means the number of simultaneous Jaguar calculations each of which uses -TPP threads. For MECP calculations this value can only be 1 or 2.""" self.parserobj.add_argument('-NPROC', type=int, default=DEFAULT_NRESOURCES, dest='nresources', help=nresources_help)
[docs] def parseArgs(self, args): """ Parse the command line arguments. :type args: tuple :param args: command line arguments """ self.options = self.parserobj.parse_args(args)
[docs]class InputError(Exception): pass
[docs]class CheckInput(object): """ Manages checking input. """
[docs] @staticmethod def checkChargesMultiplicitiesStates(charge_1, charge_2, multip_1, multip_2, state_1, state_2): """ Check the charges, multiplicities, and states. :type charge_1: int :param charge_1: the charge of the first state :type charge_2: int :param charge_2: the charge of the second state :type multip_1: int :param multip_1: the multiplicity of the first state :type multip_2: int :param multip_2: the multiplicity of the second state :type state_1: int :param state_1: the first state, 0 for ground state, N for the N-th excited state :type state_2: int :param state_2: the second state, 0 for ground state, N for the N-th excited state :raise: InputError if there is an issue """ msg = ('The MECP calculation requires charge, multiplicity, and ' 'state data for two different states yet you have provided ' 'a single charge of %s, multiplicity of %s, and state of %s.') if charge_1 == charge_2 and multip_1 == multip_2 and state_1 == state_2: raise InputError(msg % (charge_1, multip_1, state_1)) msg = ('The charges given for the two states require multiplicities ' 'belonging to different manifolds, for example one ' 'multiplicity must be even and the other must be odd.') mod_charge = abs(charge_1 - charge_2) % 2 mod_multip = abs(multip_1 - multip_2) % 2 cond_1 = mod_charge and not mod_multip cond_2 = not mod_charge and mod_multip if cond_1 or cond_2: raise InputError(msg)
[docs] @staticmethod def checkParallelization(tpp, nresources): """ Check the parallelization options. :type tpp: int :param tpp: the number of threads to use for this Jaguar job, i.e. -TPP (threads-per-process) :type nresources: int :param nresources: the number of resources for Jaguar calculations, i.e. the number of threads * the number of processors :raise: InputError if there is an issue """ if nresources % tpp: msg = ('The total number (N) of resources requested, %s, ' 'via -NPROC <N> must be a multiple of the -TPP ' 'value requested, %s.') raise InputError(msg % (nresources, tpp)) nproc = old_div(nresources, tpp) if nproc > 2: msg = ('You have requested to run %s simultaneous subjobs ' 'but this script can only run up to 2 simultaneous subjobs.') raise InputError(msg % nproc)
[docs]def get_final_state(state, scf_gs, multiplicity): """ Return the final state. :type state: int :param state: the electronic state, 0 is the ground state and <N> is the N-th excited state :type scf_gs: bool :param scf_gs: specify whether ground states should be determined using an SCF :type multiplicity: int :param multiplicity: molecular multiplicity :rtype: int or None :return: the final state or None if there isn't one """ if not scf_gs and multiplicity != 1: final_state = state + 1 elif state: final_state = state else: final_state = None return final_state
[docs]class JaguarJob(object): """ Manages a Jaguar job. """ KEYS = ['idx', 'base_name', 'charge', 'multiplicity', 'state', 'scf_gs', \ 'kwargs', 'mae_input_file', 'jag_input_file', 'jag_output_file', \ 'jag_restart_input_file', 'in_template', 'job']
[docs] def __init__(self, idx=None, base_name=None, charge=None, multiplicity=None, state=None, scf_gs=None, kwargs=None, mae_input_file=None, jag_input_file=None, jag_output_file=None, jag_restart_input_file=None, in_template=None, job=None): """ Create an instance. :type idx: int :param idx: the index :type base_name: str :param base_name: the base name :type charge: int :param charge: net molecular charge :type multiplicity: int :param multiplicity: molecular multiplicity :type state: int :param state: the electronic state, 0 is the ground state and <N> is the N-th excited state :type scf_gs: bool :param scf_gs: specify whether ground states should be determined using an SCF :type kwargs: dict :param kwargs: dictionary of Jaguar &gen section key-value pairs :type mae_input_file: str :param mae_input_file: Maestro input file :type jag_input_file: str :param jag_input_file: Jaguar input file :type jag_output_file: str :param jag_output_file: Jaguar output file :type jag_restart_input_file: str :param jag_restart_input_file: Jaguar restart input file :type in_template: str :param in_template: a Jaguar input file to use as template :type job: queue.JobControlJob :param job: job object """ self.idx = idx self.base_name = base_name self.charge = charge self.multiplicity = multiplicity self.state = state self.scf_gs = scf_gs self.kwargs = kwargs self.mae_input_file = mae_input_file self.jag_input_file = jag_input_file self.jag_output_file = jag_output_file self.jag_restart_input_file = jag_restart_input_file self.in_template = in_template self.job = job self.results = None self.debug_energy = None self.debug_forces = None
[docs] def getFinalTotalEnergy(self): """ Return the final total energy accounting for any excitation energies. :rtype: float :return: the total energy in hartree """ if DEBUG is not None: return self.debug_energy # although this might be an 'rtriplet=1' calculation .excitation_energies # may be used (note that there is also a .triplet_excitation_energies # attr) final_state = get_final_state(self.state, self.scf_gs, self.multiplicity) if final_state is not None: excitation_energy = \ self.results.last_results.excitation_energies[final_state - 1] excitation_energy = ev_to_hartree(excitation_energy) else: excitation_energy = 0.0 return self.results.last_results.energy + excitation_energy
[docs] def getFinalForcesHartreePerAngstrom(self): """ Return the final forces in units of hartree/angstrom. :rtype: numpy.array :return: the forces in hartree/angstrom in natoms X 3 """ # currently Jaguar excited state optimizations do not follow # the excited state wavefunction in a qualitative sense but # rather always use the excited state by <target> number (see # &gen keywords) and so we follow the same convention here # which for our test set appears to be OK if DEBUG is not None: forces = self.debug_forces else: forces = self.results.last_results.forces return reciprocal_bohr_to_angstrom(forces)
[docs] def getFinalStructure(self): """ Return the final structure. :type astructure: `schrodinger.structure.Structure` :param astructure: the final structure """ return self.results.getStructure()
[docs]class JaguarError(Exception): pass
[docs]class JaguarJobs(object): """ Manages Jaguar jobs. """ # chosen so that with the typical use of target=1, nroot=5 EXTRA_NROOT = 4
[docs] def __init__(self, data=DEFAULT_DATA, scf_gs=DEFAULT_SCF_GS, kwargs=DEFAULT_KWARGS, base_name=DEFAULT_BASE_NAME, host=DEFAULT_HOST, nproc=DEFAULT_NPROC, tpp=DEFAULT_TPP, logger=None): """ Create an instance. :type data: list :param data: contains (charge, multiplicity, state) tuples for all jobs, for electronic state 0 is the ground state and <N> is the N-th excited state :type scf_gs: bool :param scf_gs: specify whether ground states should be determined using an SCF :type kwargs: dict :param kwargs: dictionary of Jaguar &gen section key-value pairs :type base_name: str :param base_name: a base name used to name the jobs and their related files :type host: str :param host: the host on which to run the jobs :type nproc: int :param nproc: the number of processors to use for running the jobs, i.e. the number of simultaneous jobs :type tpp: int :param tpp: the number of threads to use for the jobs, i.e. -TPP (threads-per-process) :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ self.data = data self.scf_gs = scf_gs self.kwargs = OrderedDict(kwargs) self.base_name = base_name self.host = host self.nproc = nproc self.tpp = tpp self.logger = logger self.astructure = None self.in_templates = None self.launch_dir = None self.jobs = []
[docs] def clearJobs(self): """ Clear the list of jobs attribute. """ self.jobs = []
[docs] def setStructure(self, astructure): """ Set the structure to use for the jobs. :type astructure: `schrodinger.structure.Structure` :param astructure: the structure to use for the jobs """ self.astructure = astructure
[docs] def setInTemplates(self, in_templates=None): """ Set the input templates to use for the jobs. :type in_templates: list or None :param in_templates: a list of Jaguar input files to use as templates for creating the jobs, if used then the length of this list should be equivalent to that of the data attribute, the first ZMAT section is overwritten with the input astructure argument, use None if there aren't any templates :raise: JaguarError if in_templates data is the wrong size """ if in_templates: if len(in_templates) != len(self.data): msg = ('The number of input templates and the the number ' 'of data given in the constructor must be equivalent.') raise JaguarError(msg) else: self.in_templates = in_templates
[docs] def setUpLaunchDir(self, launch_dir=None): """ Set up the launch directory for the jobs. :type launch_dir: str or None :param launch_dir: the directory from which to launch the jobs or None if there isn't one (in which case the CWD will be used) """ if launch_dir is None: self.launch_dir = os.curdir else: self.launch_dir = launch_dir if not os.path.exists(self.launch_dir): os.mkdir(self.launch_dir)
[docs] def getBaseName(self, idx): """ Return a base name for the given job. :type idx: int :param idx: the job index :rtype: str :return: base name for the job """ return self.base_name + '_' + str(idx)
[docs] def getKwargs(self, charge, multiplicity, state): """ Return the kwargs for the given job. :type charge: int :param charge: net molecular charge :type multiplicity: int :param multiplicity: molecular multiplicity :type state: int :param state: the electronic state, 0 is the ground state and <N> is the N-th excited state :rtype: dict :return: Jaguar kwargs for the job """ kwargs = OrderedDict(self.kwargs) kwargs['molchg'] = charge if self.scf_gs: amult = multiplicity else: amult = 1 kwargs['multip'] = amult final_state = get_final_state(state, self.scf_gs, multiplicity) if final_state is not None: kwargs['nroot'] = final_state + self.EXTRA_NROOT kwargs['target'] = final_state else: kwargs.pop('itddft', None) kwargs.pop('itda', None) if not self.scf_gs: if multiplicity == 3: kwargs['rtriplet'] = 1 return kwargs
[docs] def getFileNames(self, base_name): """ Return a list of files names for the given job. :type base_name: str :param base_name: base name for the job :rtype: list :return: file names for the job """ afunc = lambda x: os.path.join(self.launch_dir, x) exts = [MAE_EXT, IN_EXT, OUT_EXT] files = [afunc(base_name + ext) for ext in exts] restart_input_file = afunc(restart_name(base_name) + IN_EXT) files.append(restart_input_file) return files
[docs] def getInTemplate(self, idx): """ Return an input template file. :type idx: int :param idx: the job index :rtype: str :return: the input template file """ if self.in_templates: in_template = self.in_templates[idx - 1] else: in_template = None return in_template
[docs] def getJob(self, base_name, jag_input_file): """ Return a queue.JobControlJob for the given job. :type base_name: str :param base_name: base name for the job :type jag_input_file: str :param jag_input_file: Jaguar input file for the job :rtype: queue.JobControlJob :return: the job object """ apath, afile = os.path.split(jag_input_file) cmd = ['jaguar', 'run'] if self.tpp > 1: cmd.extend(['-TPP', str(self.tpp)]) return queue.JobControlJob(cmd + [afile], command_dir=self.launch_dir, name=base_name)
[docs] def setUpInputFiles(self, job): """ Set up the input files for the job. :type job: JaguarJob :param job: a JaguarJob object containing various job parameters """ kwargs = {'structure': self.astructure, 'name': job.base_name} if job.in_template: kwargs['input'] = job.in_template ji = JaguarInput(**kwargs) ji.setValues(job.kwargs) ji.setStructure(self.astructure, set_molchg_multip=False) ji.writeMaefile(job.mae_input_file, [self.astructure]) if ji.sectionDefined('valid'): ji.deleteSection('valid') ji.saveAs(job.jag_input_file)
[docs] def runJobs(self): """ Run the jobs. """ dj = queue.JobDJ(max_retries=JOB_DJ_RETRIES, max_failures=queue.NOLIMIT, verbosity='normal', hosts=[(self.host, self.nproc)]) for job in self.jobs: dj.addJob(job.job) dj.run()
[docs] def processOutput(self): """ Process the output from the jobs. :raise: JaguarError if there were problems with the job """ for job in self.jobs: if not os.path.exists(job.jag_output_file): msg = ('Jaguar output file %s can not be found.') raise JaguarError(msg % job.jag_output_file) jo = JaguarOutput(job.jag_output_file) if jo.status != JaguarOutput.OK or jo.fatal_error: msg = ('The Jaguar job for %s died with error: %s.') if jo.fatal_error: err = jo.fatal_error.strip().replace('\n', '') else: err = str(jo.status) raise JaguarError(msg % (job.jag_input_file, err)) job.results = jo
[docs] def runIt(self, astructure, in_templates=None, launch_dir=None): """ Main function to run the jobs. :type astructure: `schrodinger.structure.Structure` :param astructure: the structure to use for the jobs :type in_templates: list or None :param in_templates: a list of Jaguar input files to use as templates for creating the jobs, if used then the length of this list should be equivalent to that of the data attribute, the first ZMAT section is overwritten with the input astructure argument, use None if there aren't any templates :type launch_dir: str or None :param launch_dir: the directory from which to launch the jobs or None if there isn't one (in which case the CWD will be used) :rtype: list :return: contains JaguarJob objects for all jobs """ self.clearJobs() self.setStructure(astructure) self.setInTemplates(in_templates=in_templates) self.setUpLaunchDir(launch_dir=launch_dir) for idx, data in enumerate(self.data, 1): base_name = self.getBaseName(idx) jaguar_kwargs = self.getKwargs(*data) file_names = self.getFileNames(base_name) in_template = self.getInTemplate(idx) jcjob = self.getJob(base_name, file_names[1]) args = [idx, base_name] + list(data) + \ [self.scf_gs, jaguar_kwargs] + file_names + \ [in_template, jcjob] kwargs = OrderedDict(list(zip(JaguarJob.KEYS, args))) job = JaguarJob(**kwargs) self.setUpInputFiles(job) self.jobs.append(job) if DEBUG is not None: set_e_and_g(self.astructure, self.jobs) else: self.runJobs() self.processOutput() return self.jobs
[docs]class TerminateException(Exception): pass
[docs]class IterationError(Exception): pass
[docs]class MECPStep(object): """ Manage an MECP step, which can be an iteration or a line search iteration. """ ALPHA_THRESH = 1.0e-10
[docs] def __init__(self, iteration, call_number, x_initial, method, perp_factor, para_factor, verbose, logger, bfgs_obj): """ Create an instance. :type iteration: int :param iteration: the iteration :type call_number: int :param call_number: the call number :type x_initial: numpy.array :param x_initial: the initial geometry vector (N X 1) :type method: str :param method: the method to use to determine the MECP :type perp_factor: float :param perp_factor: prefactor for the energy term whose gradient lies perpendicular to the crossing seam :type para_factor: float :param para_factor: prefactor for the energy term whose gradient lies parallel to the crossing seam :type verbose: bool :param verbose: specifies verbose logging :type logger: logging.Logger :param logger: output logger :type bfgs_obj: BFGS :param bfgs_obj: a BFGS object that manages the optimization """ self.iteration = iteration self.call_number = call_number self.x_initial = numpy.copy(x_initial) self.method = method self.perp_factor = perp_factor self.para_factor = para_factor self.verbose = verbose self.logger = logger self.bfgs_obj = bfgs_obj self.jobs = None self.energy_1 = None self.energy_2 = None self.perp_energy = None self.para_energy = None self.energy = None self.delta_energy = None self.weighted_energy = None self.forces_1 = None self.max_force_1 = None self.rms_force_1 = None self.len_force_1 = None self.forces_2 = None self.max_force_2 = None self.rms_force_2 = None self.len_force_2 = None self.angle = None self.len_delta_forces = None self.unit_delta_forces = None self.unit_norm_delta_forces = None self.perp_forces = None self.max_perp_force = None self.rms_perp_force = None self.len_perp_force = None self.para_forces = None self.max_para_force = None self.rms_para_force = None self.len_para_force = None self.forces = None self.max_force = None self.rms_force = None self.len_force = None self.weighted_forces = None self.weighted_gradients = None self.displacements = None self.max_displacement = None self.rms_displacement = None self.len_displacement = None self.in_templates = None self.summary_line = None self.x_final = None
[docs] def logInitialGeometry(self): """ Log the initial geometry. """ if self.verbose and self.logger: log_geometry(self.x_initial, header=DEFAULT_GEOMETRY_HEADER, logger=self.logger) self.logger.info('')
[docs] def logHessianEigVals(self): """ Log the Hessian eigenvalues. """ if self.verbose and self.logger: header = 'MECP Hessian eigenvalues / Hartree/(Ang.**2)' self.logger.info(header) self.logger.info('-' * len(header)) for eigval in self.bfgs_obj.hess_eigvals: self.logger.info('%.8f' % eigval) self.logger.info('')
[docs] def logStepInfo(self): """ Log the step info. """ if self.verbose and self.logger and self.bfgs_obj.step_size is not None: alen = transform.get_vector_magnitude(self.bfgs_obj.step_k) self.logger.info('Step direction length / Ang.: %.8f' % alen) self.logger.info('(Gradient, Step) angle / deg.: %.2f' % self.bfgs_obj.angle) self.logger.info('Step size: %.8f' % self.bfgs_obj.step_size) self.logger.info('')
[docs] def handleFinalGeometry(self, x_final): """ Handle the final geometry. :type x_final: numpy.array :param x_final: the final geometry vector (N X 1) """ self.x_final = numpy.copy(x_final) self.displacements = self.x_final - self.x_initial self.max_displacement, self.rms_displacement, self.len_displacement = \ get_max_and_rms_and_len(self.displacements) if self.verbose and self.logger: log_geometry(self.x_final, header='Updated geometry / Ang.', logger=self.logger) self.logger.info('') log_displacements(self.displacements, self.max_displacement, self.rms_displacement, self.len_displacement, logger=self.logger) self.logger.info('')
[docs] def setInTemplates(self): """ Set the Jaguar input templates. """ self.in_templates = [x.jag_restart_input_file for x in self.jobs]
[docs] def getUnitNormalToDeltaForces(self, prev_mecp_step=None): """ Get the unit vector that is normal to the delta forces vector. :type prev_mecp_step: MECPStep or None :param prev_mecp_step: MECPStep from the previous iteration or line search iteration or None if there isn't one :rtype: numpy.array or None :return: the unit vector perpendicular to the delta forces vector or None if there isn't one """ if prev_mecp_step and self.method == UPDATING: if prev_mecp_step.unit_norm_delta_forces is None: self.unit_norm_delta_forces = \ numpy.copy(prev_mecp_step.unit_delta_forces) else: numerator1 = numpy.dot(prev_mecp_step.unit_norm_delta_forces, self.unit_delta_forces) numerator2 = numpy.dot(prev_mecp_step.unit_delta_forces, self.unit_delta_forces) denominator = math.sqrt(numerator1**2 + numerator2**2) self.unit_norm_delta_forces = \ numerator1 * prev_mecp_step.unit_delta_forces - \ numerator2 * prev_mecp_step.unit_norm_delta_forces self.unit_norm_delta_forces /= denominator return self.unit_norm_delta_forces
[docs] def setEnergiesAndForces(self, prev_mecp_step=None): """ Set the energies and forces for the two states as well as well as the energy and forces to use for the MECP optimization. :type prev_mecp_step: MECPStep or None :param prev_mecp_step: MECPStep from the previous iteration or line search iteration or None if there isn't one """ # the following interesting Lagrangian can also be used to # find the MECP (it doesn't require a gradient projection and # thus the gradient does go to zero) however it often requires # finding the k parameter by trial-and-error and so in testing # it was ruled out in favor of the projection method, # # L = f_perp*(E2 - E1)**4 + f_para*(E2 - E2,min)(1 - exp(-k(E2 - E1)**2)), # # where f_perp = 100, f_para = 1, E2,min is the energy minimum # for state 2, and where the exp term is the inverse unit impulse # function, i.e. k is infinite (in practice ranging from 10**2 to # 10**8, etc.), for the correct choice of k the exp term modulates # the min term to function as a full minimization except for when # E2 - E1 is small enough for which it will be zero, other # possibilities might include third party engineering-type # solvers like https://www.geogebra.org/m/TEERJczX or CAD though # it is unclear whether those can be used for cases lacking # explicit functions and which therefore must solved as # minimization problems self.energy_1, self.energy_2 = \ [x.getFinalTotalEnergy() for x in self.jobs] self.forces_1, self.forces_2 = \ [x.getFinalForcesHartreePerAngstrom() for x in self.jobs] forces_1 = matrix_to_vector(self.forces_1) forces_2 = matrix_to_vector(self.forces_2) self.angle = get_angle_in_degrees(forces_1, forces_2) # this alpha prevents projecting on to the zero vector in # case the energies are equivalent e_diff = self.energy_2 - self.energy_1 if e_diff < 0: alpha = -1.0 * self.ALPHA_THRESH else: alpha = self.ALPHA_THRESH self.perp_energy = (e_diff + alpha)**2 self.para_energy = old_div((self.energy_1 + self.energy_2), 2) self.energy = self.perp_energy + self.para_energy delta_forces = forces_2 - forces_1 self.len_delta_forces = transform.get_vector_magnitude(delta_forces) self.unit_delta_forces = transform.get_normalized_vector(delta_forces) avg_forces = old_div((forces_1 + forces_2), 2) self.perp_forces = 2 * (e_diff + alpha) * delta_forces self.para_forces = avg_forces unit_perp_forces = transform.get_normalized_vector(self.perp_forces) projection = numpy.dot(self.para_forces, unit_perp_forces) * unit_perp_forces self.para_forces -= projection unit_norm_delta_forces = self.getUnitNormalToDeltaForces(prev_mecp_step) if unit_norm_delta_forces is not None: projection = numpy.dot(avg_forces, unit_norm_delta_forces) self.para_forces -= projection * unit_norm_delta_forces self.forces = self.perp_forces + self.para_forces self.weighted_energy = self.perp_factor * self.perp_energy + \ self.para_factor * self.para_energy self.weighted_forces = self.perp_factor * self.perp_forces + \ self.para_factor * self.para_forces self.weighted_gradients = -1 * self.weighted_forces self.perp_forces = vector_to_matrix(self.perp_forces) self.para_forces = vector_to_matrix(self.para_forces) self.forces = vector_to_matrix(self.forces)
[docs] def setEnergiesAndForcesData(self, prev_mecp_step=None): """ Set energies and forces data. :type prev_mecp_step: MECPStep or None :param prev_mecp_step: MECPStep from the previous iteration or line search iteration or None if there isn't one """ if prev_mecp_step: self.delta_energy = self.energy - prev_mecp_step.energy self.max_force_1, self.rms_force_1, self.len_force_1 = \ get_max_and_rms_and_len(self.forces_1) self.max_force_2, self.rms_force_2, self.len_force_2 = \ get_max_and_rms_and_len(self.forces_2) self.max_perp_force, self.rms_perp_force, self.len_perp_force = \ get_max_and_rms_and_len(self.perp_forces) self.max_para_force, self.rms_para_force, self.len_para_force = \ get_max_and_rms_and_len(self.para_forces) self.max_force, self.rms_force, self.len_force = \ get_max_and_rms_and_len(self.forces)
[docs] def setJobData(self, jobs, prev_mecp_step=None): """ Set job data. :type jobs: list :param jobs: contains the JaguarJob instances for the two jobs :type prev_mecp_step: MECPStep or None :param prev_mecp_step: MECPStep from the previous iteration or line search iteration or None if there isn't one """ self.jobs = jobs self.setInTemplates() self.setEnergiesAndForces(prev_mecp_step) self.setEnergiesAndForcesData(prev_mecp_step)
[docs] def logEnergyAndForces(self, header, energy, forces, max_force, rms_force, len_force, energy_header=DEFAULT_ENERGY_HEADER, forces_header=DEFAULT_FORCES_HEADER): """ Log energy and forces. :type header: str :param header: a header :type energy: float or None :param energy: the energy or None if there isn't one :type forces: numpy.array :param forces: the forces (natoms X 3) :type max_force: float :param max_force: the magnitude of the largest forces element :type rms_force: float :param rms_force: the RMS of forces :type len_force: float :param len_force: the length of forces :type energy_header: str :param energy_header: an energy header :type forces_header: str :param forces_header: a forces header """ if self.logger: self.logger.info(header) self.logger.info('=' * len(header)) log_energy_and_forces(energy, matrix_to_vector(forces), max_force, rms_force, len_force, energy_header=energy_header, forces_header=forces_header, logger=self.logger) self.logger.info('')
[docs] def getFormattedSummaryLine(self, entries): """ Return a formatted summary line from the given data entries. :type entries: list :param entries: the data to format :rtype: str :return: a formatted summary line """ spaces = 2 * ' ' idx_width = 6 prop_width = 20 ljust = lambda entries: spaces.join( [entry.ljust(idx_width) for entry in entries]) rjust = lambda entries: spaces.join( [entry.rjust(prop_width) for entry in entries]) return ljust(entries[:2]) + rjust(entries[2:])
[docs] def logSummaryHeader(self): """ Log the summary header. """ columns1 = ['opt', 'call', 'E_1/', 'E_2/', 'Energy/', \ 'delta Energy/', 'max Force/', 'RMS Force/', \ 'max Displacement/', 'RMS Displacement/'] columns2 = ['iter', 'number', 'Hartree', 'Hartree', 'Hartree', \ 'Hartree', 'Hartree/Ang.', 'Hartree/Ang.', 'Ang.', 'Ang.'] header1 = self.getFormattedSummaryLine(columns1) header2 = self.getFormattedSummaryLine(columns2) if self.logger: self.logger.info(header1) self.logger.info(header2) self.logger.info('=' * len(header2))
[docs] def logSummary(self): """ Log a summary. """ all_entries = [self.energy_1, self.energy_2, self.energy, \ self.delta_energy, self.max_force, self.rms_force, \ self.max_displacement, self.rms_displacement] if self.iteration == self.call_number == 1 or self.verbose: self.logSummaryHeader() entries = [str(self.iteration), str(self.call_number)] for entry in all_entries: if entry is None: entries.append(str(entry)) else: entries.append('%.8f' % entry) self.summary_line = self.getFormattedSummaryLine(entries) if self.logger: self.logger.info(self.summary_line) if self.verbose: self.logger.info('')
[docs] def logJobData(self): """ Log job data. """ if self.verbose and self.logger: self.logEnergyAndForces('State 1', self.energy_1, self.forces_1, self.max_force_1, self.rms_force_1, self.len_force_1) self.logEnergyAndForces('State 2', self.energy_2, self.forces_2, self.max_force_2, self.rms_force_2, self.len_force_2) self.logger.info('Delta forces length / Hartree/Ang.: %.8f' % self.len_delta_forces) self.logger.info('(Force 1, Force 2) angle / deg.: %.2f' % self.angle) self.logger.info('') self.logEnergyAndForces('MECP perp', None, self.perp_forces, self.max_perp_force, self.rms_perp_force, self.len_perp_force) self.logEnergyAndForces('MECP para', None, self.para_forces, self.max_para_force, self.rms_para_force, self.len_para_force) self.logEnergyAndForces('MECP', self.energy, self.forces, self.max_force, self.rms_force, self.len_force)
[docs] def terminate(self, convergence_dict): """ Terminate the MECP optimization. :type convergence_dict: dict :param convergence_dict: contains various convergence thresholds :raise: TerminateException if it is time to terminate """ datas = [self.delta_energy, self.max_force, self.rms_force, \ self.max_displacement, self.rms_displacement] keys = [DELTA_ENERGY, MAX_FORCE, RMS_FORCE, MAX_DISPLACEMENT, \ RMS_DISPLACEMENT] msg = ('The optimization has converged with a %s of ' '%.8f %s %.8f.') for data, key in zip(datas, keys): if data is not None: value = convergence_dict[key] if key == DELTA_ENERGY: condition = data <= 0 and data >= value comparator = '>=' else: condition = data <= value comparator = '<=' if condition: raise TerminateException(msg % (key, data, comparator, value))
[docs]class MinimizerError(Exception): pass
[docs]class BFGS(object): """ Manage a BFGS optimization. """ # the methods line_search_wolfe1 and minimize were # based on those from the scipy.optimize.optimize module # which has the following notice: # # ******NOTICE*************** # optimize.py module by Travis E. Oliphant # # You may copy and use this module as you see fit with no # guarantee implied provided you keep this notice in all copies. # *****END NOTICE************ # # see http://home.agh.edu.pl/~pba/pdfdoc/Numerical_Optimization.pdf # LARGE_RHO_K = 1000.0 IMAG_TOL = 0.000001 ANGLE_TOL = 0.01 NEG_HESS_TOL = -1.0e-2
[docs] def __init__(self, c1=DEFAULT_C1, c2=DEFAULT_C2, amax=DEFAULT_MAX_STEP_SIZE, amin=DEFAULT_MIN_STEP_SIZE, xtol=DEFAULT_XTOL, max_force=DEFAULT_MAX_FORCE, max_iterations=DEFAULT_ITERATIONS, eps=DEFAULT_EPS, init_hess=DEFAULT_HESS, verbose=DEFAULT_VERBOSE, logger=None): """ Create an instance. :type c1: float :param c1: parameter for Armijo condition rule :type c2: float :param c2: parameter for curvature condition rule :type amax: float :param amax: maximum allowable step size :type amin: float :param amin: minimum allowable step size :type xtol: float :param xtol: nonnegative relative tolerance for an acceptable step, the search exits with a warning if the relative difference between sty and stx is less than xtol where sty and stx define an interval :type max_force: float :param max_force: maximum allowable force element :type max_iterations: int :param max_iterations: the maximum number of iterations :type eps: float :param eps: step size in Angstrom for any finite difference approximations :type init_hess: str :param init_hess: the type of initial Hessian to use :type verbose: bool :param verbose: specifies verbose logging :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ self.c1 = c1 self.c2 = c2 self.amax = amax self.amin = amin self.xtol = xtol self.max_force = max_force self.max_iterations = max_iterations self.eps = eps self.init_hess = init_hess self.verbose = verbose self.logger = logger self.iteration = 1 self.inv_hess = None self.hess = None self.hess_eigvals = None self.hess_eigvecs = None self.step_k = None self.angle = None self.step_size = None self.resetFiniteDiffCall()
[docs] def line_search_wolfe1(self, f, fprime, xk, pk, gfk=None, old_fval=None, old_old_fval=None, args=(), c1=1e-4, c2=0.9, amax=50, amin=1e-8, xtol=1e-14): """ As `scalar_search_wolfe1` but do a line search to direction `pk` Parameters ---------- f : callable Function `f(x)` fprime : callable Gradient of `f` xk : array_like Current point pk : array_like Search direction gfk : array_like, optional Gradient of `f` at point `xk` old_fval : float, optional Value of `f` at point `xk` old_old_fval : float, optional Value of `f` at point preceding `xk` The rest of the parameters are the same as for `scalar_search_wolfe1`. Returns ------- stp, f_count, g_count, fval, old_fval As in `line_search_wolfe1` gval : array Gradient of `f` at the final point """ if gfk is None: gfk = fprime(xk) if isinstance(fprime, tuple): eps = fprime[1] fprime = fprime[0] newargs = (f, eps) + args gradient = False else: newargs = args gradient = True gval = [gfk] gc = [0] fc = [0] def phi(s): fc[0] += 1 return f(xk + s * pk, *args) def derphi(s): gval[0] = fprime(xk + s * pk, *newargs) if gradient: gc[0] += 1 else: fc[0] += len(xk) + 1 return numpy.dot(gval[0], pk) derphi0 = numpy.dot(gfk, pk) stp, fval, old_fval = self.scalar_search_wolfe1(phi, derphi, old_fval, old_old_fval, derphi0, c1=c1, c2=c2, amax=amax, amin=amin, xtol=xtol) return stp, fc[0], gc[0], fval, old_fval, gval[0]
[docs] def scalar_search_wolfe1(self, phi, derphi, phi0=None, old_phi0=None, derphi0=None, c1=1e-4, c2=0.9, amax=50, amin=1e-8, xtol=1e-14): """ Scalar function search for alpha that satisfies strong Wolfe conditions alpha > 0 is assumed to be a descent direction. Parameters ---------- phi : callable phi(alpha) Function at point `alpha` derphi : callable dphi(alpha) Derivative `d phi(alpha)/ds`. Returns a scalar. phi0 : float, optional Value of `f` at 0 old_phi0 : float, optional Value of `f` at the previous point derphi0 : float, optional Value `derphi` at 0 amax : float, optional Maximum step size c1, c2 : float, optional Wolfe parameters Returns ------- alpha : float Step size, or None if no suitable step was found phi : float Value of `phi` at the new point `alpha` phi0 : float Value of `phi` at `alpha=0` Notes ----- Uses routine DCSRCH from MINPACK. """ if phi0 is None: phi0 = phi(0.) if derphi0 is None: derphi0 = derphi(0.) if old_phi0 is not None and derphi0 != 0: alpha1 = min(1.0, 1.01 * 2 * (phi0 - old_phi0) / derphi0) if alpha1 < 0: alpha1 = 1.0 else: alpha1 = 1.0 phi1 = phi0 derphi1 = derphi0 isave = numpy.zeros((2,), numpy.intc) dsave = numpy.zeros((13,), float) task = b'START' # see http://ftp.mcs.anl.gov/pub/MINPACK-2/csrch/dcsrch.f maxiter = 30 for i in range(maxiter): stp, phi1, derphi1, task = scipy.optimize.minpack2.dcsrch( alpha1, phi1, derphi1, c1, c2, xtol, task, amin, amax, isave, dsave) if task[:2] == b'FG': alpha1 = stp phi1 = phi(stp) derphi1 = derphi(stp) else: break else: # maxiter reached, the line search did not converge stp = None if task[:5] == b'ERROR' or task[:4] == b'WARN': stp = None # failed return stp, phi1, phi0
[docs] def resetFiniteDiffCall(self): """ Reset the finite difference call. """ self.finite_diff_call = 0
[docs] def getInitialInvHessian(self, fun, jac, fun_0, jac_0, x_0): """ Return an initial guess for the inverse Hessian. :type fun: function :param fun: function to minimize :type jac: function :param jac: the Jacobian of the function being minimized :type fun_0: float :param fun_0: function value at initial solution :type jac_0: float :param jac_0: the Jacobian value at initial solution :type x_0: numpy.array :param x_0: initial solution :rtype: numpy.array :return: the initial guess inverse Hessian (N/3 X 3) """ nvar = len(x_0) if self.init_hess == IDENTITY_HESS: return numpy.eye(nvar) elif self.init_hess == DIAG_FWD_FIN_DIFF_GRAD_HESS: if self.logger: msg = ('Building an initial guess inverse Hessian using ' 'diagonal forward finite differences of the gradient.') self.logger.info(msg) self.logger.info('') diags = [] for idx in range(nvar): x_k = numpy.copy(x_0) x_k[idx] += self.eps self.finite_diff_call = idx + 1 fun_k = fun(x_k) jac_k = jac(x_k) diags.append(old_div(self.eps, (jac_k[idx] - jac_0[idx]))) self.resetFiniteDiffCall() return numpy.diag(numpy.array(diags))
[docs] def minimize(self, fun, x_0, jac=None, **kwargs): """ Minimization of a function using the BFGS algorithm. :type fun: function :param fun: function to minimize :type x_0: numpy.array :param x_0: initial solution :type jac: function :param jac: the Jacobian of the function being minimized :rtype: scipy.optimize.optimize.OptimizeResult :return: optimization parameters """ x_k = numpy.copy(x_0) old_fun_k = None fun_k = fun(x_k) jac_k = jac(x_k) idty = numpy.eye(len(x_0)) self.inv_hess = self.getInitialInvHessian(fun, jac, fun_k, jac_k, x_k) max_jac_k, rms_jac_k, len_jac_k = get_max_and_rms_and_len(jac_k) while max_jac_k > self.max_force and self.iteration <= self.max_iterations: self.hess = numpy.linalg.inv(self.inv_hess) hess_eigvals, self.hess_eigvecs = numpy.linalg.eig(self.hess) hess_eigvals_imag = numpy.imag(hess_eigvals) if any(hess_eigvals_imag): for imag in hess_eigvals_imag: if abs(imag) > self.IMAG_TOL: msg = ( 'Hessian is sufficiently non-symmetric and has an ' 'eigenvalue whose magnitude of the imaginary part is ' '%s which is larger than the acceptable threshold of %s.' ) raise MinimizerError(msg % (abs(imag), self.IMAG_TOL)) self.hess_eigvals = numpy.real(hess_eigvals) for eigval in self.hess_eigvals: if eigval < self.NEG_HESS_TOL: msg = ( 'Hessian eigenvalues have become sufficiently negative.' ) raise MinimizerError(msg) self.step_k = -numpy.dot(self.inv_hess, jac_k) self.angle = get_angle_in_degrees(self.step_k, -1 * jac_k) if abs(self.angle - 90.) <= self.ANGLE_TOL: msg = ('The gradient and step direction vectors are becoming ' 'orthogonal.') raise MinimizerError(msg) # originally a line_search_wolfe12 was called here which wrapped # a line_search_wolfe1 call followed by a line_search_wolfe2 call # (called if the first didn't converge) but I don't quite see # the point for this in this application ret = self.line_search_wolfe1(fun, jac, x_k, self.step_k, gfk=jac_k, old_fval=fun_k, old_old_fval=old_fun_k, c1=self.c1, c2=self.c2, amax=self.amax, amin=self.amin, xtol=self.xtol) if ret[0] is None: msg = ( 'Desired error not necessarily achieved due to precision loss.' ) raise MinimizerError(msg) else: self.step_size, n_fun_k, n_jac_k, fun_k, old_fun_k, new_jac_k = ret if not numpy.isfinite(fun_k): msg = ('Function value became infinite.') raise MinimizerError(msg) self.iteration += 1 s_k = self.step_size * self.step_k x_k = x_k + s_k if new_jac_k is None: new_jac_k = jac(x_k) y_k = new_jac_k - jac_k jac_k = new_jac_k max_jac_k, rms_jac_k, len_jac_k = get_max_and_rms_and_len(jac_k) if max_jac_k <= self.max_force: break msg = 'Divide-by-zero encountered: rho_k set to %s.' try: rho_k = old_div(1.0, numpy.dot(y_k, s_k)) except ZeroDivisionError: rho_k = self.LARGE_RHO_K if self.logger: self.logger.info(msg % rho_k) if numpy.isinf(rho_k): rho_k = self.LARGE_RHO_K if self.logger: self.logger.info(msg % rho_k) A1 = idty - rho_k * s_k[:, numpy.newaxis] * y_k[numpy.newaxis, :] A2 = idty - rho_k * y_k[:, numpy.newaxis] * s_k[numpy.newaxis, :] self.inv_hess = numpy.dot(A1, numpy.dot(self.inv_hess, A2)) + \ rho_k * s_k[:, numpy.newaxis] * s_k[numpy.newaxis, :] good_status = 0 if self.iteration > self.max_iterations: msg = 'Maximum number of iterations has been exceeded.' else: msg = 'Optimization terminated successfully.' return scipy.optimize.optimize.OptimizeResult( fun=fun_k, jac=jac_k, hess_inv=self.inv_hess, status=good_status, success=(good_status == 0), message=msg, x=x_k, nit=self.iteration)
[docs]class EnergyAndGradients(object): """ Manage energy and gradient calls. """
[docs] def __init__(self, astructure, data=DEFAULT_DATA, scf_gs=DEFAULT_SCF_GS, kwargs=DEFAULT_KWARGS, base_name=DEFAULT_BASE_NAME, all_results=DEFAULT_ALL_RESULTS, convergence_dict=DEFAULT_CONVERGENCE_DICT, max_iterations=DEFAULT_ITERATIONS, method=DEFAULT_METHOD, perp_factor=DEFAULT_PERP_FACTOR, para_factor=DEFAULT_PARA_FACTOR, all_geometries=DEFAULT_ALL_GEOMETRIES, host=DEFAULT_HOST, nproc=DEFAULT_NPROC, tpp=DEFAULT_TPP, verbose=DEFAULT_VERBOSE, logger=None, bfgs_obj=None): """ Create an instance. :type astructure: `schrodinger.structure.Structure` :param astructure: the structure for which the energy and gradients are needed :type data: list :param data: contains (charge, multiplicity, state) tuples for the jobs for the two MECP states, for electronic state 0 is the ground state and <N> is the N-th excited state :type scf_gs: bool :param scf_gs: specify whether ground states should be determined using an SCF :type kwargs: dict :param kwargs: dictionary of Jaguar &gen section key-value pairs :type base_name: str :param base_name: a base name used to name the jobs and their related files :type all_results: bool :param all_results: use this option to copy all subdirectories containing results from intermediate Jaguar force, etc. calculations back to the launch host :type convergence_dict: dict :param convergence_dict: contains various convergence thresholds :type max_iterations: int :param max_iterations: the maximum number of MECP geometry optimization iterations :type method: str :param method: the method to use to determine the MECP :type perp_factor: float :param perp_factor: prefactor for the energy term whose gradient lies perpendicular to the crossing seam :type para_factor: float :param para_factor: prefactor for the energy term whose gradient lies parallel to the crossing seam :type all_geometries: bool :param all_geometries: use this option to report all geometries, i.e. the geometries from all MECP geometries iterations will be reported in the output :type host: str :param host: the host on which to run the jobs :type nproc: int :param nproc: the number of processors to use for running the jobs, i.e. the number of simultaneous jobs :type tpp: int :param tpp: the number of threads to use for the jobs, i.e. -TPP (threads-per-process) :type verbose: bool :param verbose: specifies verbose logging :type logger: logging.Logger or None :param logger: output logger or None if there isn't one :type bfgs_obj: BFGS :param bfgs_obj: a BFGS object that manages the optimization """ self.astructure = astructure.copy() self.data = data self.scf_gs = scf_gs self.kwargs = OrderedDict(kwargs) self.base_name = base_name self.all_results = all_results self.convergence_dict = convergence_dict self.max_iterations = max_iterations self.method = method self.perp_factor = perp_factor self.para_factor = para_factor self.all_geometries = all_geometries self.host = host self.nproc = nproc self.tpp = tpp self.verbose = verbose self.logger = logger self.bfgs_obj = bfgs_obj self.jobbe = jobcontrol.get_backend() self.mae_out_file = self.base_name + ALL_FILE_SUFFIX self.mae_out_writer = None self.jaguar_jobs = None self.iteration_launch_dir = None self.prev_mecp_step = None self.summary_lines = [] self.call_number = None self.setUpMaeWriter() self.resetCallNumber() self.setUpJaguarJobs()
[docs] def setUpMaeWriter(self): """ Set up the Maestro output writer. """ if self.all_geometries: self.mae_out_writer = structure.MaestroWriter(self.mae_out_file) if self.jobbe: self.jobbe.addLogFile(self.mae_out_file)
[docs] def tearDownMaeWriter(self, exception): """ Tear down the Maestro output writer and then raise an exception. :type exception: Exception :param exception: the exception to raise after tear down """ if self.mae_out_writer: self.mae_out_writer.close() raise exception
[docs] def resetCallNumber(self): """ Reset the call number. """ self.call_number = 1
[docs] def setUpJaguarJobs(self): """ Set up for the Jaguar jobs. """ self.kwargs['igeopt'] = -1 self.jaguar_jobs = JaguarJobs(data=self.data, scf_gs=self.scf_gs, kwargs=self.kwargs, base_name=self.base_name, host=self.host, nproc=self.nproc, tpp=self.tpp, logger=self.logger)
[docs] def logHeader(self): """ Log a header. """ if self.verbose and self.logger: header = 'Iteration %s, call number %s' % (self.bfgs_obj.iteration, self.call_number) self.logger.info('+' * len(header)) self.logger.info(header) self.logger.info('+' * len(header)) self.logger.info('')
[docs] def newIteration(self): """ Return True if this is a new iteration. :rtype: bool :return: True if this is a new iteration, False otherwise """ if not self.prev_mecp_step: return True else: return self.bfgs_obj.iteration > self.prev_mecp_step.iteration
[docs] def setUpIterationLaunchDir(self): """ Set up the launch directory for this MECP iteration. :rtype: str :return: the launch directory for this MECP iteration """ self.iteration_launch_dir = self.base_name + '_' + \ str(self.bfgs_obj.iteration) if not os.path.exists(self.iteration_launch_dir): os.mkdir(self.iteration_launch_dir)
[docs] def getLaunchSubDir(self): """ Return the name of the launch subdirectory for the current job. :rtype: str :return: the launch subdirectory for the current job """ adir = os.path.join(self.iteration_launch_dir, self.base_name + '_' + str(self.call_number)) return adir
[docs] def runJobs(self, launch_dir): """ Run the jobs. :type launch_dir: str :param launch_dir: the launch directory for the jobs :rtype: list :return: contains the JaguarJob instances for the two jobs """ if self.prev_mecp_step: in_templates = self.prev_mecp_step.in_templates else: in_templates = None try: jobs = self.jaguar_jobs.runIt(self.astructure, in_templates=in_templates, launch_dir=launch_dir) except JaguarError as err: if self.jobbe: self.jobbe.copyOutputFile(launch_dir) self.tearDownMaeWriter(err) if self.all_results and self.jobbe: self.jobbe.copyOutputFile(launch_dir) return jobs
[docs] def logSummary(self): """ Log a summary. """ if self.logger: header = 'Summary' self.logger.info(header) self.logger.info('-' * len(header)) self.prev_mecp_step.logSummaryHeader() for aline in self.summary_lines: self.logger.info(aline) self.logger.info('')
[docs] def handleStructure(self, x_to_use, step=None): """ Handle setting and writing the structure. :type x_to_use: numpy.array :param x_to_use: the geometry vector to use (N X 1) :type step: MECPStep or None :param step: the step from which to obtain the energy to use or None if there is no energy """ if step is not None: iteration = step.iteration call_number = step.call_number else: iteration = self.bfgs_obj.iteration call_number = self.call_number self.astructure.setXYZ(vector_to_matrix(x_to_use)) properties = {TITLE_KEY: self.base_name + '_' + \ str(iteration) + '_' + str(call_number)} if step is not None: energies = { MECP_ENERGY_1_KEY: step.energy_1, MECP_ENERGY_2_KEY: step.energy_2 } properties.update(energies) else: self.astructure.property.pop(MECP_ENERGY_1_KEY) self.astructure.property.pop(MECP_ENERGY_2_KEY) set_properties_and_write_structure(self.astructure, properties, self.mae_out_writer)
[docs] def finishPrevStep(self, next_x): """ Finish the previous step. :type next_x: numpy.array :param next_x: the next geometry vector (N X 1) """ if self.newIteration() or \ self.bfgs_obj.iteration == self.call_number - 1 == 1: self.prev_mecp_step.logHessianEigVals() self.prev_mecp_step.logStepInfo() self.prev_mecp_step.handleFinalGeometry(next_x) self.prev_mecp_step.logSummary() self.summary_lines.append(self.prev_mecp_step.summary_line) self.handleStructure(self.prev_mecp_step.x_initial, self.prev_mecp_step) try: self.prev_mecp_step.terminate(self.convergence_dict) except TerminateException as term_excp: if not self.verbose and self.logger: self.logger.info('') self.tearDownMaeWriter(term_excp)
[docs] def handleFiniteDifferences(self, next_x, finite_diff_call): """ Handle finite difference calls. :type next_x: numpy.array :param next_x: the next geometry vector (N X 1) :type finite_diff_call: int :param finite_diff_call: the index of a finite difference call :rtype: float, numpy.array :return: the energy and gradients (N X 1) """ self.astructure.setXYZ(vector_to_matrix(next_x)) launch_dir = self.base_name + '_finite_diff_' + str(finite_diff_call) if not os.path.exists(launch_dir): os.mkdir(launch_dir) jobs = self.runJobs(launch_dir) mecp_step = MECPStep(self.bfgs_obj.iteration, self.call_number, next_x, self.method, self.perp_factor, self.para_factor, self.verbose, self.logger, self.bfgs_obj) mecp_step.setJobData(jobs, self.prev_mecp_step) return mecp_step.weighted_energy, mecp_step.weighted_gradients
[docs] def getEnergyAndGradients(self, next_x): """ Return the energy and gradients that drive the MECP geometry optimization. :type next_x: numpy.array :param next_x: the next geometry vector (N X 1) :rtype: float, numpy.array :return: the energy and gradients (N X 1) """ if self.bfgs_obj.finite_diff_call: return self.handleFiniteDifferences(next_x, self.bfgs_obj.finite_diff_call) if self.prev_mecp_step and \ numpy.all(next_x == self.prev_mecp_step.x_initial): return self.prev_mecp_step.weighted_energy, self.prev_mecp_step.weighted_gradients if self.prev_mecp_step: self.finishPrevStep(next_x) if self.newIteration(): self.resetCallNumber() if self.bfgs_obj.iteration > self.max_iterations: self.handleStructure(next_x) msg = ('This optimization has reached the ' 'given maximum number of iterations, i.e. %s.') self.tearDownMaeWriter(IterationError(msg % self.max_iterations)) else: self.setUpIterationLaunchDir() self.astructure.setXYZ(vector_to_matrix(next_x)) self.logHeader() mecp_step = MECPStep(self.bfgs_obj.iteration, self.call_number, next_x, self.method, self.perp_factor, self.para_factor, self.verbose, self.logger, self.bfgs_obj) mecp_step.logInitialGeometry() launch_subdir = self.getLaunchSubDir() jobs = self.runJobs(launch_subdir) mecp_step.setJobData(jobs, self.prev_mecp_step) mecp_step.logJobData() self.prev_mecp_step = mecp_step self.call_number += 1 return mecp_step.weighted_energy, mecp_step.weighted_gradients
[docs]class MECP(object): """ Manages finding MECPs. """
[docs] def __init__(self, astructure, charge_1=DEFAULT_CHARGE_1, charge_2=DEFAULT_CHARGE_2, multip_1=DEFAULT_MULTIP_1, multip_2=DEFAULT_MULTIP_2, state_1=DEFAULT_STATE_1, state_2=DEFAULT_STATE_2, scf_gs=DEFAULT_SCF_GS, jaguar_kwargs=DEFAULT_KWARGS, iterations=DEFAULT_ITERATIONS, eps=DEFAULT_EPS, init_hess=DEFAULT_HESS, method=DEFAULT_METHOD, perp_factor=DEFAULT_PERP_FACTOR, para_factor=DEFAULT_PARA_FACTOR, convergence_dict=DEFAULT_CONVERGENCE_DICT, guess_geom=DEFAULT_GUESS_GEOM, all_geometries=DEFAULT_ALL_GEOMETRIES, all_results=DEFAULT_ALL_RESULTS, properties=DEFAULT_PROPERTIES, base_name=DEFAULT_BASE_NAME, tpp=DEFAULT_TPP, nresources=DEFAULT_NRESOURCES, verbose=DEFAULT_VERBOSE, logger=None): """ Create an instance. :type astructure: `schrodinger.structure.Structure` :param astructure: the structure for which the MECP is needed :type charge_1: int :param charge_1: net molecular charge of the first state :type charge_2: int :param charge_2: net molecular charge of the second state :type multip_1: int :param multip_1: multiplicity of the first state :type multip_2: int :param multip_2: multiplicity of the second state :type state_1: int :param state_1: the first state, 0 for ground state, N for the N-th excited state :type state_2: int :param state_2: the second state, 0 for ground state, N for the N-th excited state :type scf_gs: bool :param scf_gs: specify whether ground states should be determined using an SCF :type jaguar_kwargs: dict :param jaguar_kwargs: dictionary of Jaguar &gen section key/value pairs :type iterations: int :param iterations: the maximum number of MECP geometry optimization iterations :type eps: float :param eps: step size in Angstrom for any finite difference approximations :type init_hess: str :param init_hess: the type of initial Hessian to use :type method: str :param method: the method to use to determine the MECP :type perp_factor: float :param perp_factor: prefactor for the energy term whose gradient lies perpendicular to the crossing seam :type para_factor: float :param para_factor: prefactor for the energy term whose gradient lies parallel to the crossing seam :type convergence_dict: dict :param convergence_dict: contains various convergence thresholds :type guess_geom: str :param guess_geom: the initial guess geometry to use for the MECP geometry optimization, choices are GUESS_GEOM_CHOICES :type all_geometries: bool :param all_geometries: use this option to report all geometries, i.e. the geometries from all MECP geometries iterations will be reported in the output :type all_results: bool :param all_results: use this option to copy all subdirectories containing results from intermediate Jaguar force, etc. calculations back to the launch host :type properties: bool :param properties: whether to calculate properties of the MECP or not :type base_name: str :param base_name: a base name used to name the job and its related files :type tpp: int :param tpp: the number of threads to use for any intermediate Jaguar calculations, i.e. -TPP (threads-per-process) :type nresources: int :param nresources: the number of resources to use for Jaguar calculations, i.e. the number of threads * the number of processors :type verbose: bool :param verbose: specifies verbose logging :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ self.astructure = astructure self.charge_1 = charge_1 self.charge_2 = charge_2 self.multip_1 = multip_1 self.multip_2 = multip_2 self.state_1 = state_1 self.state_2 = state_2 self.scf_gs = scf_gs self.jaguar_kwargs = OrderedDict(jaguar_kwargs) self.iterations = iterations self.eps = eps self.init_hess = init_hess self.method = method self.perp_factor = perp_factor self.para_factor = para_factor self.convergence_dict = DEFAULT_CONVERGENCE_DICT.copy() self.convergence_dict.update(convergence_dict) self.guess_geom = guess_geom self.all_geometries = all_geometries self.all_results = all_results self.properties = properties self.base_name = base_name self.tpp = tpp self.nresources = nresources self.verbose = verbose self.logger = logger self.data = [(self.charge_1, self.multip_1, self.state_1), \ (self.charge_2, self.multip_2, self.state_2)] self.jobbe = jobcontrol.get_backend() self.nproc = None self.mae_out_file = self.base_name + OUT_FILE_SUFFIX self.mae_out_writer = None self.mae_out_all_file = None self.opt_1_data = {} self.opt_2_data = {} self.mecp_data = {}
[docs] def setSCFGS(self): """ Set the SCF ground state. """ if not self.scf_gs: self.scf_gs = not (self.multip_1 == 3 or self.multip_2 == 3)
[docs] def setJaguarKwargs(self): """ Set the Jaguar kwargs. """ for key in RESERVED_JAGUAR_KEYS: self.jaguar_kwargs.pop(key, None) defaults = OrderedDict(DEFAULT_KWARGS) if 'dftname' not in self.jaguar_kwargs: defaults.pop('dftname') defaults.update(self.jaguar_kwargs) self.jaguar_kwargs = defaults if not self.state_1 and not self.state_2 and self.scf_gs: self.jaguar_kwargs.pop('itddft') self.jaguar_kwargs.pop('itda')
[docs] def checkInput(self): """ Check input. """ jaguarworkflows.checkChargeAndMultiplicity(self.astructure, self.charge_1, self.multip_1) jaguarworkflows.checkChargeAndMultiplicity(self.astructure, self.charge_2, self.multip_2) CheckInput.checkChargesMultiplicitiesStates(self.charge_1, self.charge_2, self.multip_1, self.multip_2, self.state_1, self.state_2) CheckInput.checkParallelization(self.tpp, self.nresources) if not self.para_factor and self.logger: msg = ('The value of the para_factor parameter is 0.0 which ' 'reduces the MECP search to that of an arbitrary crossing ' 'point rather than a minimum energy crossing point. The ' 'arbitrary point may or may not be the minimum energy ' 'point as it will depend on the initial geometry.') self.logger.warning(msg)
[docs] def setParallelization(self): """ Set the parallelization options. """ self.nproc = old_div(self.nresources, self.tpp)
[docs] def setUpMaeWriter(self): """ Set up the Maestro output writer. """ self.mae_out_writer = structure.MaestroWriter(self.mae_out_file)
[docs] def tearDownMaeWriter(self, exception=None): """ Tear down the Maestro output writer and then raise an exception if there is one. :type exception: Exception or None :param exception: the exception to raise after tear down or None if there isn't one """ self.mae_out_writer.close() if exception is not None: raise exception
[docs] def logParams(self): """ Log the parameters. """ header = 'Parameters' charge_1 = textlogger.get_param_string('Charge 1', self.charge_1, MSG_WIDTH) multip_1 = \ textlogger.get_param_string('Multiplicity 1', self.multip_1, MSG_WIDTH) state_1 = textlogger.get_param_string('State 1', self.state_1, MSG_WIDTH) charge_2 = textlogger.get_param_string('Charge 2', self.charge_2, MSG_WIDTH) multip_2 = \ textlogger.get_param_string('Multiplicity 2', self.multip_2, MSG_WIDTH) state_2 = textlogger.get_param_string('State 2', self.state_2, MSG_WIDTH) scf_gs = textlogger.get_param_string('SCF ground state', self.scf_gs, MSG_WIDTH) iterations = \ textlogger.get_param_string('Iterations', self.iterations, MSG_WIDTH) eps = textlogger.get_param_string( 'Step size for finite differences / Ang.', self.eps, MSG_WIDTH) init_hess = \ textlogger.get_param_string('Initial Hessian', self.init_hess, MSG_WIDTH) method = \ textlogger.get_param_string('Method', self.method, MSG_WIDTH) perp_factor = textlogger.get_param_string('Perpendicular factor', self.perp_factor, MSG_WIDTH) para_factor = textlogger.get_param_string('Parallel factor', self.para_factor, MSG_WIDTH) delta_energy = textlogger.get_param_string( 'Convergence: Delta energy / Hartree', self.convergence_dict[DELTA_ENERGY], MSG_WIDTH) max_force = textlogger.get_param_string( 'Convergence: Max force / Hartree/Ang.', self.convergence_dict[MAX_FORCE], MSG_WIDTH) rms_force = textlogger.get_param_string( 'Convergence: RMS force / Hartree/Ang.', self.convergence_dict[RMS_FORCE], MSG_WIDTH) max_displacement = textlogger.get_param_string( 'Convergence: Max displacement / Ang.', self.convergence_dict[MAX_DISPLACEMENT], MSG_WIDTH) rms_displacement = textlogger.get_param_string( 'Convergence: RMS displacement / Ang.', self.convergence_dict[RMS_DISPLACEMENT], MSG_WIDTH) guess_geom = \ textlogger.get_param_string('Guess geometry', self.guess_geom, MSG_WIDTH) all_geometries = textlogger.get_param_string('All geometries', self.all_geometries, MSG_WIDTH) all_results = textlogger.get_param_string('All results', self.all_results, MSG_WIDTH) properties = textlogger.get_param_string('Properties', self.properties, MSG_WIDTH) base_name = textlogger.get_param_string('Base name', self.base_name, MSG_WIDTH) verbose = textlogger.get_param_string('Verbose logging', self.verbose, MSG_WIDTH) nproc = textlogger.get_param_string('Number of processors', self.nproc, MSG_WIDTH) tpp = textlogger.get_param_string('Threads-per-process (TPP)', self.tpp, MSG_WIDTH) self.logger.info(header) self.logger.info('-' * len(header)) self.logger.info(charge_1) self.logger.info(multip_1) self.logger.info(state_1) self.logger.info(charge_2) self.logger.info(multip_2) self.logger.info(state_2) self.logger.info(scf_gs) self.logger.info(iterations) self.logger.info(eps) self.logger.info(init_hess) self.logger.info(method) self.logger.info(perp_factor) self.logger.info(para_factor) self.logger.info(delta_energy) self.logger.info(max_force) self.logger.info(rms_force) self.logger.info(max_displacement) self.logger.info(rms_displacement) self.logger.info(guess_geom) self.logger.info(all_geometries) self.logger.info(all_results) self.logger.info(properties) self.logger.info(base_name) self.logger.info(verbose) self.logger.info(nproc) self.logger.info(tpp) self.logger.info('') header = 'Jaguar options' self.logger.info(header) self.logger.info('-' * len(header)) for pair in self.jaguar_kwargs.items(): self.logger.info('%s=%s' % pair) self.logger.info('')
[docs] def doStateOptimizations(self): """ Do the geometry optimizations for the individual states and record their data. """ data = [] if self.properties: data.extend(self.data) elif self.guess_geom == OPT_1: data.append(self.data[0]) elif self.guess_geom == OPT_2: data.append(self.data[1]) if not data: return if self.logger: msg = ('Performing geometry optimizations of requested state(s).') self.logger.info(msg) self.logger.info('') kwargs = OrderedDict(self.jaguar_kwargs) kwargs['igeopt'] = 1 jaguar_jobs = JaguarJobs(data=data, scf_gs=self.scf_gs, kwargs=kwargs, base_name=self.base_name, host=DEFAULT_HOST, nproc=self.nproc, tpp=self.tpp, logger=self.logger) launch_dir = self.base_name + '_' + STATE_OPTIMIZATIONS try: jobs = jaguar_jobs.runIt(self.astructure, in_templates=None, launch_dir=launch_dir) except JaguarError as err: if self.jobbe: self.jobbe.copyOutputFile(launch_dir) self.tearDownMaeWriter(err) if self.all_results and self.jobbe: self.jobbe.copyOutputFile(launch_dir) dicts = [{'structure': job.getFinalStructure(), \ 'energy': job.getFinalTotalEnergy()} for job in jobs] if self.properties: self.opt_1_data, self.opt_2_data = dicts set_properties_and_write_structure( self.opt_1_data['structure'], {TITLE_KEY: self.base_name + '_' + OPT_1}, self.mae_out_writer) set_properties_and_write_structure( self.opt_2_data['structure'], {TITLE_KEY: self.base_name + '_' + OPT_2}, self.mae_out_writer) if self.logger: msg = ('State %s minimum Energy / Hartree: %.8f') self.logger.info(msg % ('1', self.opt_1_data['energy'])) self.logger.info(msg % ('2', self.opt_2_data['energy'])) self.logger.info('') elif self.guess_geom == OPT_1: self.opt_1_data = dicts[0] elif self.guess_geom == OPT_2: self.opt_2_data = dicts[0]
[docs] def getGuessGeometry(self): """ Return the guess geometry. :rtype: numpy.array :return: the guess geometry (N X 1) """ msg = ('Using %s geometry as initial guess geometry.') if self.guess_geom == INPUT: x_0 = self.astructure.getXYZ() msg = msg % INPUT elif self.guess_geom == OPT_1: x_0 = self.opt_1_data['structure'].getXYZ() msg = msg % OPT_1 elif self.guess_geom == OPT_2: x_0 = self.opt_2_data['structure'].getXYZ() msg = msg % OPT_2 if self.logger: self.logger.info(msg) self.logger.info('') return matrix_to_vector(x_0)
[docs] def postProcessOptimization(self, energy_and_gradients): """ Post process the optimization. :type energy_and_gradients: EnergyAndGradients :param energy_and_gradients: the instance passed to the optimizer """ get_energy = lambda x: energy_and_gradients.astructure.property.get(x) get_barrier = lambda x, y: hartree_to_kcal_per_mol(x - y) self.mecp_data['structure'] = energy_and_gradients.astructure properties = {TITLE_KEY: self.base_name + '_' + DEFAULT_BASE_NAME} if energy_and_gradients.prev_mecp_step: if self.verbose and self.logger: energy_and_gradients.logSummary() energy_1, energy_2 = list( map(get_energy, [MECP_ENERGY_1_KEY, MECP_ENERGY_2_KEY])) if energy_1 is not None and energy_2 is not None: self.mecp_data['energy_1'] = energy_1 self.mecp_data['energy_2'] = energy_2 energies = { MECP_ENERGY_1_KEY: energy_1, MECP_ENERGY_2_KEY: energy_2 } properties.update(energies) if self.properties: barriers = {BARRIER_1_KEY: get_barrier(energy_1, self.opt_1_data['energy']), \ BARRIER_2_KEY: get_barrier(energy_2, self.opt_2_data['energy'])} properties.update(barriers) if self.logger: msg = ( '%s barrier (E_MECP - E_min) (State %s) / kcal/mol: %.3f' ) self.logger.info( msg % ('Forward', '1', barriers[BARRIER_1_KEY])) self.logger.info( msg % ('Reverse', '2', barriers[BARRIER_2_KEY])) self.logger.info('') set_properties_and_write_structure(self.mecp_data['structure'], properties, self.mae_out_writer)
[docs] def getBFGSObject(self, x_0): """ Return a BFGS object. :type x_0: numpy.array :param x_0: the guess geometry (N X 1) :rtype: BFGS :return: the object to manage the BFGS optimization """ kwargs = {} kwargs['c1'] = DEFAULT_C1 kwargs['c2'] = DEFAULT_C2 kwargs['amax'] = DEFAULT_MAX_STEP_SIZE kwargs['amin'] = DEFAULT_MIN_STEP_SIZE kwargs['xtol'] = DEFAULT_XTOL # set the max_force and max_iterations larger than that # requested to ensure that we handle the termination # from EnergyAndGradients.getEnergyAndGradients rather # than from BFGS.minimize kwargs['max_force'] = self.convergence_dict[MAX_FORCE] * 1E-10 kwargs['max_iterations'] = self.iterations + 1E10 kwargs['eps'] = self.eps kwargs['init_hess'] = self.init_hess kwargs['verbose'] = self.verbose kwargs['logger'] = self.logger return BFGS(**kwargs)
[docs] def getEnergyAndGradientsObject(self, bfgs_obj): """ Return a EnergyAndGradients object. :type bfgs_obj: BFGS :param bfgs_obj: the object to manage the BFGS optimization :rtype: EnergyAndGradients :return: the object to manage the energy and gradient calls for the BFGS optimization """ kwargs = {} kwargs['data'] = self.data kwargs['scf_gs'] = self.scf_gs kwargs['kwargs'] = self.jaguar_kwargs kwargs['base_name'] = self.base_name kwargs['all_results'] = self.all_results kwargs['convergence_dict'] = self.convergence_dict kwargs['max_iterations'] = self.iterations kwargs['method'] = self.method kwargs['perp_factor'] = self.perp_factor kwargs['para_factor'] = self.para_factor kwargs['all_geometries'] = self.all_geometries kwargs['host'] = DEFAULT_HOST kwargs['nproc'] = self.nproc kwargs['tpp'] = self.tpp kwargs['verbose'] = self.verbose kwargs['logger'] = self.logger kwargs['bfgs_obj'] = bfgs_obj return EnergyAndGradients(self.astructure, **kwargs)
[docs] def getOptKwargs(self, bfgs_obj): """ Return the kwargs for the optimization. :type bfgs_obj: BFGS :param bfgs_obj: the object to manage the BFGS optimization :rtype: dict :return: the parameters for the optimization """ kwargs = {} kwargs['method'] = bfgs_obj.minimize kwargs['jac'] = True return kwargs
[docs] def optimize(self): """ Optimize the geometry to the MECP. """ if DEBUG is None: x_0 = self.getGuessGeometry() else: nvar = 3 * self.astructure.atom_total x_0 = get_guess(nvar=nvar) bfgs_obj = self.getBFGSObject(x_0) energy_and_gradients = self.getEnergyAndGradientsObject(bfgs_obj) kwargs = self.getOptKwargs(bfgs_obj) if self.logger: self.logger.info('Starting optimization') self.logger.info('') try: optimize_result = \ scipy.optimize.minimize(energy_and_gradients.getEnergyAndGradients, x_0, **kwargs) except TerminateException as term_excp: exception = None if self.logger: self.logger.info(str(term_excp)) self.logger.info('') except (IterationError, JaguarError, MinimizerError) as err: exception = err self.postProcessOptimization(energy_and_gradients) if exception: if energy_and_gradients.mae_out_writer: energy_and_gradients.mae_out_writer.close() self.tearDownMaeWriter(exception) if self.logger: self.logger.info('Finished optimization') self.logger.info('') self.mae_out_all_file = energy_and_gradients.mae_out_file
[docs] def doIncorporation(self): """ Incorporate output Maestro files. """ zip_out_name = self.base_name + '_all.zip' out_files = [self.mae_out_file] if self.all_geometries: out_files.append(self.mae_out_all_file) jobutils.zip_and_set_incorporation(zip_out_name, out_files)
[docs] def runIt(self): """ Main function to find the MECP. """ self.setSCFGS() self.setJaguarKwargs() self.checkInput() self.setParallelization() if self.logger: self.logParams() self.setUpMaeWriter() if self.jobbe: self.jobbe.addLogFile(self.mae_out_file) self.doStateOptimizations() self.optimize() self.doIncorporation() self.tearDownMaeWriter()
[docs]def vector_to_matrix(vector): """ Convert vector to matrix. :type vector: numpy.array :param vector: N X 1 array :rtype: numpy.array :return: N/3 X 3 array """ natoms = old_div(vector.size, 3) return numpy.reshape(vector, (natoms, 3))
[docs]def matrix_to_vector(matrix): """ Convert matrix to vector. :type matrix: numpy.array :param matrix: N/3 X 3 array :rtype: numpy.array :return: N X 1 array """ return matrix.flatten()
[docs]def get_max_and_rms_and_len(matrix): """ Return the max, RMS, and length of the given matrix. :type matrix: numpy.array :param matrix: the matrix :rtype: float, float, float :return: the max, RMS, and length of the given matrix """ amax = max([abs(numpy.amin(matrix)), abs(numpy.amax(matrix))]) arms = numpy.sqrt(numpy.mean(numpy.square(matrix))) alen = transform.get_vector_magnitude(matrix_to_vector(matrix)) return amax, arms, alen
[docs]def log_vector(vector, header=None, log_sums=False, logger=None): """ Log or print the given vector in a natoms X 3 format. :type vector: numpy.array :param vector: the vector to log (N X 1) :type header: str or None :param header: a header or None if there isn't one :type log_sums: bool :param log_sums: whether to log the sums of x, y, and z over atoms :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ spaces = 3 * ' ' idx_width = 5 xyz_width = 15 afunc = lambda i, x, y, z: spaces.join([ i.ljust(idx_width), x.rjust(xyz_width), y.rjust(xyz_width), z.rjust(xyz_width) ]) lines = [] if header is not None: lines.extend([header, '-' * len(header)]) first_row = afunc('atom', 'x', 'y', 'z') lines.extend([first_row, '-' * len(first_row)]) sums = numpy.zeros(3) for idx, row in enumerate(vector_to_matrix(vector), 1): sums += row xyz = tuple(['%.8f' % coord for coord in row]) lines.append(afunc(str(idx), *xyz)) if log_sums: sums = tuple(['%.8f' % coord for coord in sums]) lines.append(afunc('total', *sums)) if logger: for line in lines: logger.info(line) else: for line in lines: print(line)
[docs]def log_geometry(geometry, header=DEFAULT_GEOMETRY_HEADER, logger=None): """ Log or print the geometry in a natoms X 3 format. :type geometry: numpy.array :param geometry: the geometry to log (N X 1) :type header: str :param header: a header :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ log_vector(geometry, header=header, logger=logger)
[docs]def log_displacements(displacements, max_displacement, rms_displacement, len_displacement, header=DEFAULT_DISPLACEMENTS_HEADER, logger=None): """ Log or print the displacements in a natoms X 3 format. :type displacements: numpy.array :param displacements: the displacements to log (N X 1) :type max_displacement: float :param max_displacement: the magnitude of the largest displacements element :type rms_displacement: float :param rms_displacement: the RMS of displacements :type len_displacement: float :param len_displacement: the length of displacements :type header: str :param header: a header :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ log_vector(displacements, header=header, log_sums=True, logger=logger) lines = ['Max: %.8f' % max_displacement] lines.append('RMS: %.8f' % rms_displacement) lines.append('Length: %.8f' % len_displacement) if logger: for line in lines: logger.info(line) else: for line in lines: print(line)
[docs]def log_energy_and_forces(energy, forces, max_force, rms_force, len_force, energy_header=DEFAULT_ENERGY_HEADER, forces_header=DEFAULT_FORCES_HEADER, logger=None): """ Log or print the energy and forces in a natoms X 3 format. :type energy: float or None :param energy: the energy or None if there isn't one :type forces: numpy.array :param forces: the forces to log (N X 1) :type max_force: float :param max_force: the magnitude of the largest forces element :type rms_force: float :param rms_force: the RMS of forces :type len_force: float :param len_force: the length of forces :type energy_header: str :param energy_header: an energy header :type forces_header: str :param forces_header: a forces header :type logger: logging.Logger or None :param logger: output logger or None if there isn't one """ if energy is not None: energy_line = energy_header + ': %.8f' % energy if logger: logger.info(energy_line) else: print(energy_line) log_vector(forces, header=forces_header, log_sums=True, logger=logger) lines = ['Max: %.8f' % max_force] lines.append('RMS: %.8f' % rms_force) lines.append('Length: %.8f' % len_force) if logger: for line in lines: logger.info(line) else: for line in lines: print(line)
[docs]def reciprocal_bohr_to_angstrom(array): """ Return the given array converted from units of reciprocal Bohr to reciprocal Angstrom. :type array: numpy.array :param array: the array to convert :rtype: numpy.array :return: the converted array """ factor = old_div(scipy.constants.angstrom, \ scipy.constants.physical_constants['Bohr radius'][0]) return factor * array
[docs]def ev_to_hartree(energy): """ Return the given energy converted from eV to Hartree. :rtype: float :return: energy in Hartree """ key = 'electron volt-hartree relationship' return scipy.constants.physical_constants[key][0] * energy
[docs]def hartree_to_kcal_per_mol(in_hartree): """ Return the given energy converted from Hartree to kcal/mol. :type in_hartree: float :param in_hartree: energy in Hartree :rtype: float :return: energy in kcal/mol """ key = 'hartree-joule relationship' in_joule = in_hartree * scipy.constants.physical_constants[key][0] in_kcal_per_mol = old_div(in_joule, scipy.constants.calorie) return in_kcal_per_mol * scipy.constants.N_A / scipy.constants.kilo
[docs]def set_properties_and_write_structure(astructure, properties, writer): """ Set the given properties on the structure and write it using the given writer. :type astructure: `schrodinger.structure.Structure` :param astructure: the structure to write :type properties: dict :param properties: key/value pairs of properties to write :type writer: structure.MaestroWriter :param writer: the writer """ for key, value in properties.items(): astructure.property[key] = value if writer: writer.append(astructure)
[docs]def get_angle_in_degrees(vec_1, vec_2): """ Return the angle in units of degrees between the given two vectors. :type vec_1: numpy.array :param vec_1: the first vector :type vec_2: numpy.array :param vec_2: the second vector :rtype: float :return: the angle in degrees """ angle = transform.get_angle_between_vectors(vec_1, vec_2) return numpy.rad2deg(angle)