Source code for schrodinger.application.desmond.packages.viparr1.viparr.viparr

#!/usr/bin/env desres-exec
#{
# exec desres-cleanenv -eUSER -eVIPARR_FFDIR -eVIPARR_PDIR \
# -m Python/2.5.1-09/bin \
# -m destro/0.4.7.1-D/lib-python \
# -m simplejson/1.7.4-1+Release \
# -- python $0 "$@"
#}

# viparr - force field parameter assignment program
# (Violet "Vi" Parr is the girl in The Incredibles who generates the force fields.)

import glob
import optparse
import platform
import six
import os
import sys

from schrodinger.application.desmond.packages.destro import Maeff

from .. import get_ffdir
from .. import get_pdir
from . import viparr_merge
from . import viparr_residue
from .version import viparr_version
from .viparr_plugins_util import parse_json
from .viparr_util import CtAtoms, CtBonds, ffio_add_atoms_block, \
    ffio_add_block, ffio_add_header, ffio_add_pseudo_block

#import templates_check

if platform.system() == "Linux":
    # Currently this causes a crash on Windows.
    # The default precision appears to be 8 anyway...
    from schrodinger.application.desmond.packages.destro import \
        setglobalprecision
    setglobalprecision(8)


[docs]class Struct(object): pass
[docs]class Atom(object): pass
[docs]def templates_convert(templates): '''Convert templates structure to internal data structure. ''' class Atom(): pass for (name, res) in templates.items(): atoms = dict() # atom structure is: # [aname, atomic number, charge, [atypes]] for a in res['atoms']: atom = Atom() atom.an = a[1] atom.charge = a[2] atom.atypes = a[3] atoms[a[0]] = atom templates[name]['atoms'] = atoms
[docs]def compress_residue_list(residue_list): '''Compress FF representation if possible ''' if len(residue_list) == 0: return if residue_list[0] == None: return first = residue_list[0].name for res in residue_list: if res == None: break if res.name != first: break else: # natural termination of loop means all residues are the same # now check the first residue to make sure all atom indices are contiguous # if so, then this residue list should only contain one residue if max(residue_list[0].nlist) + 1 == len(residue_list[0].nlist): del residue_list[1:] print('Compressing FF representation...')
[docs]def get_atom_list(atoms, neighbors, templates, residue_list): '''Return a list of atoms in the ct structure. Each atom has the following fields: pdbname, pdbres, resnum, atomic number, charge, atomtypes. Also check that all residue and atom names are in the templates data structure. ''' class Atom(object): pass num_atoms = len(atoms) # above is correct unless we have compressed the residue list if len(residue_list) == 1: if residue_list[0] != None: num_atoms = len(residue_list[0].nlist) atom_list = [None] * num_atoms for ires, res in enumerate(residue_list): if res != None: # create marker to tell if atoms are polarizable # Yuck... This needs to go...(its used in bondfields) # and is specific to drude polarization polist = set([ p[4] for p in templates[res.name].get('pseudos', []) if p[3].startswith("drude_") ]) for i, n in enumerate(res.nlist): # atom numbers are 0-based a = Atom() a.type = 'atom' a.pdbname = res.alist[i] a.pdbres = res.name a.tmplname = res.name a.resnum = ires + 1 # 1-based a.neighbors = neighbors[n] a.index = n obj = templates[res.name]['atoms'][a.pdbname] a.an = obj.an a.charge = obj.charge a.atypes = obj.atypes if a.pdbname in polist: a.polnotpol = True else: a.polnotpol = False atom_list[n] = a return atom_list
[docs]def get_neighbor_list(atom_list): '''Return list of neighbor lists; one neighbor list for each atom, in order. Each neighborlist is guaranteed to be sorted. Entries are 1-based. ''' neighbor_list = list() for a in atom_list: if a == None: neighbor_list.append([]) else: neighbor_list.append(a.neighbors) return neighbor_list
[docs]def get_bond_list(neighbor_list): '''Return list of all bonds (i,j) such that i<j. List will be sorted meaningfully. Entries are 1-based. ''' bond_list = list() bond_set = set() num_atoms = len(neighbor_list) for i in range(num_atoms): for j in neighbor_list[i]: if neighbor_list[j - 1] == []: raise AssertionError( 'A bond (%d,%d) straddles your force fields' % (i + 1, j)) if i + 1 < j: bond = (i + 1, j) if bond not in bond_set: bond_list.append(bond) bond_set.add(bond) return bond_list
[docs]def get_angle_list(neighbor_list): '''Return list of all angles (i,j,k) such that i<k. List will be sorted meaningfully. Entries are 1-based. ''' angle_list = list() num_atoms = len(neighbor_list) for i in range(num_atoms): b = neighbor_list[i] # assumes b[0]<b[1]<b[2]<b[3] nb = len(b) index = i + 1 if nb > 6: six.reraise(AssertionError, 'Atom %d has more than six bonds:' % index, b) for at0 in range(nb): for at2 in range(at0 + 1, nb): angle_list.append((b[at0], index, b[at2])) angle_list.sort() return angle_list
[docs]def get_proper_list(neighbor_list, bond_list): '''Return list of all propers (i,j,k,l) such that i<l. List will be sorted meaningfully. Entries are 1-based. ''' proper_list = list() for b in bond_list: # contains only one of (i,j) and (j,i) # loop over possible (1,4)'s for n0 in neighbor_list[b[0] - 1]: if n0 == b[1]: continue for n1 in neighbor_list[b[1] - 1]: if n1 == b[0]: continue if n1 == n0: # triangle continue if n0 < n1: proper_list.append((n0, b[0], b[1], n1)) else: proper_list.append((n1, b[1], b[0], n0)) proper_list.sort() return proper_list
[docs]def get_bonded_term_list(fieldname, templates, residue_list, sorted): '''Return a list of bonded items. Each bonded item is a list, namely: [i,j] when fieldname = 'bonds' or 'exclusions'. if sorted, i<j [i,j,k] when fieldname = 'angles'. if sorted, i<k [i,j,k,l] when fieldname = 'propers' or 'impropers'. if sorted, i<l The returned list will be sorted in a meaningful way. ''' term_list = list() for res in residue_list: if res == None: continue # ignore residue if templates does not know about it name = res.name terms = templates[name].get(fieldname, None) if terms == None: continue for dih in terms: nk = len(dih) d = list() for atom in dih: a = res.atoms.get(atom) if a == None: raise AssertionError( 'In residue <"%s",%d> (%s), did not find atom %s' % (res.chain, res.resnum_orig, res.name_orig, atom)) d.append(a + 1) if len(d) == nk: if d[0] > d[-1] and sorted: d.reverse() # use sorted order i<j, i<k, i<l term_list.append(tuple(d)) else: print('Warning: some bonded terms were not found', nk) term_list.sort() return term_list
[docs]def merge(list0, list1): '''Merge list1 into list0, as long as there are no entries in common. ''' if list1 != None and list1 != []: for p in list1: if p in list0: print(p, list1, list0) raise AssertionError('Cannot merge lists; already in list') else: list0.append(p) list0.sort()
[docs]def get_pseudo_terms(iffld, pseudo_residue_list, nres, nstart): '''Return drude and virtual particle list (numbering starting at nstart) and list of 'bonds' between drudes, virtual sites and particles. The pseudo_list can be appended to the atom list. ''' if nres != 1 and nres != len(pseudo_residue_list): raise AssertionError('Mismatched residue and pseudo residue lists') pseudo_term_list = list() pseudo_bond_list = list() index = nstart - 1 for pres in pseudo_residue_list[:nres]: for a in pres: index += 1 if (iffld != a.ffldidx): pseudo_term_list.append(None) continue if hasattr(a, 'index') and a.index != index - 1: raise AssertionError('Bad Pseudo Indexes') a.index = index - 1 pseudo_term_list.append(a) # pseudo 'bonds' pseudo_bond_list.append((a.partner_index, index)) if hasattr(a, 'partner_index2'): pseudo_bond_list.append((a.partner_index2, index)) return pseudo_term_list, pseudo_bond_list
[docs]def update_pseudo_residue_list(iffld, pseudo_residue_list, atoms, templates, residue_list): '''pseudo_list is a list of all residues in ct block, and each residue is a list of pseudo particles. ''' for ires, res in enumerate(residue_list): if res == None: continue # ignore if do not recognize # ignore if pseudo_list already has an entry for this residue if len(pseudo_residue_list[ires]) != 0: print('Warning: pseudo_list already has entry for residue') continue li = templates[res.name].get('pseudos', []) for line in li: nsite = line[4] #"primary site" of pseudo site = res.atoms[nsite] # clone first atom in site definition a = Atom() a.ffldidx = iffld a.type = 'pseudo' a.pdbname = line[0] a.chain = atoms[site].chain a.resnum = atoms[site].resnum a.pdbres = atoms[site].pdbres a.tmplname = res.name a.x = atoms[site].x a.y = atoms[site].y a.z = atoms[site].z a.charge = line[1] a.atypes = line[2] a.field = line[3] a.terms = line[4:] a.partner_atom = nsite a.partner_index = res.atoms[nsite] + 1 if a.field in ['virtual_midpoint']: a.partner_index2 = res.atoms[line[5]] + 1 if hasattr(atoms[site], 'grp_thermostat'): a.grp_thermostat = atoms[site].grp_thermostat if hasattr(atoms[site], 'grp_energy'): a.grp_energy = atoms[site].grp_energy if (not (a.field.startswith("virtual_") or a.field.startswith("drude_"))): raise AssertionError('Unrecognized pseudo type: %s' % (str(a.field))) pseudo_residue_list[ires].append(a)
[docs]def construct_lists(iffld, atoms, neighbors, rules, templates, residue_list, pseudo_residue_list): '''Return lists that describe the structure. ''' compress_residue_list(residue_list) atom_list = get_atom_list(atoms, neighbors, templates, residue_list) neighbor_list = get_neighbor_list(atom_list) bond_list = get_bond_list(neighbor_list) angle_list = get_angle_list(neighbor_list) proper_list = get_proper_list(neighbor_list, bond_list) # Keep input order for impropers (the order may be signigicant depending on how the coordinate is calculated) improper_list = get_bonded_term_list('impropers', templates, residue_list, False) extra_angle_list = get_bonded_term_list('angles', templates, residue_list, True) extra_proper_list = get_bonded_term_list('propers', templates, residue_list, True) extra_excl_list = get_bonded_term_list('exclusions', templates, residue_list, True) # drude and virtual lists pseudo_list, pseudo_bond_list = get_pseudo_terms(iffld, pseudo_residue_list, len(residue_list), len(atom_list) + 1) atom_list.extend(pseudo_list) merge(angle_list, extra_angle_list) merge(proper_list, extra_proper_list) # Setup for simultaneous generation of pairs and exclusions. # default arguments in rules.get() are for backwards compatibility excl_rule = rules.get('exclusions', 4) if excl_rule < 1 or excl_rule > 4: raise AssertionError( "Invalid exclusion rule: %d. 'exclusions' must be between 1 and 4 inclusive" % (excl_rule)) es_scale_list = rules.get('es_scale', [0.0] * (excl_rule - 1)) lj_scale_list = rules.get('lj_scale', [0.0] * (excl_rule - 1)) if (len(es_scale_list) != excl_rule - 1 or len(lj_scale_list) != excl_rule - 1): raise AssertionError( "Invalid 'es_scale' (%d) or 'lj_scale' (%d) list lengths... Should be %d" % (len(es_scale_list), len(lj_scale_list), excl_rule - 1)) # Keep track of (1,x) pairs separately while generating exclusions. # Pair terms are only genertated up to and including the requested exclusion depth # 1-2 pairs: particles separated by one bond and not in the extra exclusion list # 1-3 pairs: particles separated by two bonds and not 1-{2 } pairs and not in the extra exclusion list # 1-4 pairs: particles separated by three bonds and not 1-{2,3} pairs and not in the extra exclusion list pair_list = [] excl_list = set(extra_excl_list) if excl_rule >= 2: tmp = set([(a[0], a[1]) for a in bond_list]) tmp.difference_update(excl_list) pair_list.append(tmp) excl_list.update(tmp) if excl_rule >= 3: tmp = set([(a[0], a[2]) for a in angle_list]) tmp.difference_update(excl_list) pair_list.append(tmp) excl_list.update(tmp) if excl_rule >= 4: tmp = set([(a[0], a[3]) for a in proper_list]) tmp.difference_update(excl_list) pair_list.append(tmp) excl_list.update(tmp) excl_list = list(excl_list) excl_list.sort() for i, p in enumerate(pair_list): p = list(p) p.sort() pair_list[i] = p lists = Struct() lists.atom_list = atom_list lists.neighbor_list = neighbor_list lists.residue_list = residue_list lists.bond_list = bond_list lists.angle_list = angle_list lists.proper_list = proper_list lists.improper_list = improper_list lists.excl_list = excl_list lists.pair_list = pair_list lists.es_scale_list = es_scale_list lists.lj_scale_list = lj_scale_list lists.pseudo_list = pseudo_list lists.pseudo_bond_list = pseudo_bond_list # UNDONE: excl and pairs and cmap are lists of tuples, unlike the other lists return lists
[docs]def merge_blocks(first, second): '''Merge second block into first; print warning message if element in second block is already in the first block. Assumes blocks themselves hold unique entries. First block is the merged one. ''' firstset = set([tuple([tuple(f[0]), f[1]]) for f in first]) for e in second: key = tuple([tuple(e[0]), e[1]]) if key in firstset: print('Ignoring subsequent match:', e) else: first.append(e)
[docs]def merge_blocks2(first, second): '''Merge first block into second; print warning message if element in first block will override element in second block. Assumes blocks themselves hold unique entries. Second block is the merged one. [NOT THE SAME BEHAVIOR AS ABOVE.] ''' seconddict = dict() for i, e in enumerate(second): t = tuple([tuple(e[0]), e[1]]) seconddict[t] = i for f in first: key = tuple([tuple(f[0]), f[1]]) if key in seconddict: ind = seconddict[key] print('Ignoring subsequent match:', second[ind]) second[ind] = f else: second.append(f)
[docs]def add_water_constraints(ff_handle): """ Add an ffio_constraints block to the ff_handle for a water ct by reading the ffio_c1 values from the angles and bonds, and append '_constrained' to the functional forms of the angles and bonds. :param ff_handle: the handle of the ffio_block :type ff_handle: destro.Destro """ # this will be a list of ffio_c1 values for [angle, bond1, bond2] ffio_c1s = [] for block in [ff_handle.ffio_angles, ff_handle.ffio_bonds]: for row in block: funct = row.ffio_funct if not funct.endswith('_constrained'): row.ffio_funct = funct + '_constrained' ffio_c1s.append(row.ffio_c1) block = [[[1, 2, 3], 'HOH', ffio_c1s]] del ff_handle.ffio_constraints ffio_add_block(ff_handle, 'ffio_constraints', block)
################################################################################
[docs]def main(vargs=[]): # noqa: M511 usage = ''' %prog [options] mae_file maeff_outfile Description: viparr is a force field parameter assignment program. -f and -d are used to specify force fields; the order of force fields is important: earlier ones take precedence over later ones. Simple example: %prog -f charmm27 -f spc mae_file maeff_outfile More complicated example: %prog -d me/myfiles/myff -f charmm27 -f spc mae_file maeff_outfile ''' def parser_add_directory(option, opt_str, value, parser, ffdir): if ffdir == None: raise AssertionError('VIPARR_FFDIR environment variable undefined; '+ \ 'no built-in force fields can be specified') parser.values.forcefield.append(os.path.join(ffdir, value)) def parser_merge(option, opt_str, value, parser): if len(parser.values.forcefield) == 0: raise AssertionError('Specify -f or -d before -m') print('Merging <%s> and <%s>' % (parser.values.forcefield[-1], value)) destdir = viparr_merge.merge(parser.values.forcefield[-1], value) print('Using temporary directory <%s>' % destdir) parser.values.forcefield[-1] = destdir ffdir = get_ffdir() pdir = get_pdir() if not ffdir: usage += 'VIPARR_FF_PATH is empty: No built-in force fields' else: usage += 'Available built-in force fields:\n ' temp = os.listdir(ffdir) temp.sort() usage += '\n '.join(temp) opt = optparse.OptionParser(usage=usage, version=viparr_version) opt.set_defaults(ctnum=None) opt.add_option('-c', type='int', dest='ctnum', action="append", help='''run viparr for selected CT blocks only, e.g., for the first and third CT blocks, use "-c 1 -c 3"''') opt.add_option('-f', type='string', dest='ffname', action='callback', callback=parser_add_directory, callback_args=(ffdir,), help='''built-in force field name; several can be listed, each preceded by its own -f''') opt.add_option('-d', action='append', dest='forcefield', default=[], help='''user-provided force field directory; several can be listed, each preceded by its own -d''') opt.add_option('-m', type='string', dest='merge_dir', action='callback', callback=parser_merge, help='''path to user-defined force field directory that is to be merged with previously specified force field; several can be listed, each preceded by its own -m''') opt.add_option('-n', dest='ffio_name', default='Generated by viparr', help='force field name to put into output file') opt.add_option('-p', dest='plugindir', default=pdir, help='override for plugins directory (advanced usage)') opt.add_option('-x', action="store_true", dest='no_header', default=False, help='do not print header info (for testing purposes)') opt.add_option('-v', action="store_true", dest='verbose', default=False, help='verbose output') opt.add_option( '-w', action="store_true", dest='is_water', default=False, help='use this option when running viparr on a water ct, this ' 'will recreate any constraints block that is removed') if (vargs == []): opts, args = opt.parse_args() else: opts, args = opt.parse_args(vargs) print('viparr', viparr_version) # check number of arguments first if len(args) != 2: opt.error('incorrect number of arguments: %d %s' % (len(args), str(args))) mae_file = args[0] maeff_outfile = args[1] # check that there is at least one forcefield if len(opts.forcefield) < 1: raise AssertionError('No force field specified') # check that each directory exists print('All FF directories, in order:\n', '\n'.join(opts.forcefield)) for dir in opts.forcefield: if not os.path.isdir(dir): raise AssertionError('Force field dir <%s> is not a directory' % dir) print('Plug-in directory:', opts.plugindir) if opts.plugindir == None: raise AssertionError('No plug-in directory specified with -p or with '+ \ 'VIPARR_PDIR environment variable') else: if not os.path.isdir(opts.plugindir): raise AssertionError('Plug-in dir <%s> is not a directory' % opts.plugindir) # open maestro file print('Reading maestro file <%s> ...' % mae_file) if mae_file.endswith('.mae.gz') or mae_file.endswith('.maegz'): import gzip mae_buffer = gzip.open(mae_file, 'rb').read() M = Maeff(mae_buffer) else: with open(mae_file) as f: M = Maeff(f) print('Done reading maestro file') num_structures = len(M) # create structure list, which is 0-based list of ct numbers to process if opts.ctnum == None: structure_list = list(range(num_structures)) else: structure_list = [] for e in opts.ctnum: if e > num_structures or e < 1: raise AssertionError('Bad value <%d> for selected CTNUM' % e) structure_list.append(e - 1) # skip structure if it is a "full_system" for s in structure_list: if M[s + 1].__contains__('ffio_ct_type'): if M[s + 1].ffio_ct_type == 'full_system': print('Skipping full_system structure <%d>' % (s + 1)) structure_list.remove(s) if len(structure_list) == 0: print('No CT blocks to process...exiting') sys.exit(1) # process the atoms and bonds in each ct # this should be the only place that use CtBonds and CtAtoms # ct_atoms and ct_bonds may have entries that are None ct_atoms = [None] * num_structures ct_bonds = [None] * num_structures for s in structure_list: print('Analyzing ct block %d' % (s + 1)) ct = M[s + 1] ct_bonds[s] = CtBonds(ct) ct_atoms[s] = CtAtoms(ct) # residue list for each structure for each force field all_residue_lists = [[[] for i in range(num_structures)] \ for j in range(len(opts.forcefield))] ###################################################################### # Loop for constructing the residue lists # We do this as a pre-processing step so that we can check that all residues # were matched by some template for idir, dir in enumerate(opts.forcefield): templates = {} tnames = [t for t in glob.glob(os.path.join(dir, 'templates*'))] for tmpl in tnames: viparr_merge.update_templates_strict(templates, tmpl) #templates_check.templates_check(templates) viparr_residue.templates_reorder_atoms(templates) # loop through all ct blocks for s in structure_list: print('******Constructing residue list for <%s> Structure %d' % (dir, s + 1)) residue_list = viparr_residue.get_residue_list( ct_atoms[s], ct_bonds[s], templates) # save this residue list all_residue_lists[idir][s] = residue_list ###################################################################### # Check that all residues were matched by some force field # and print diagnostic messages. # Also do the essential step of overwriting unmatched residues with 'None' derror = dict() dwarn = dict() dmatch = dict() num_messages = 5 # number of messages per residue name num_errors = 0 for s in structure_list: for i in range(len(all_residue_lists[0][s])): matches = [] for idir, dir in enumerate(opts.forcefield): res = all_residue_lists[idir][s][i] if res.match: matches.append(dir) if res.name_orig != res.name: pair = (res.name_orig, res.name) dmatch[pair] = dmatch.setdefault(pair, 0) + 1 if dmatch[pair] <= num_messages: print('Note: Struct %d, Res <"%s",%d> (%s) matched (%s)' % \ (s +1, res.chain, res.resnum_orig, res.name_orig, res.name)) if dmatch[pair] == num_messages: print( 'Ignoring further messages for (%s) matching (%s)' % pair) if len(matches) != 1: res = all_residue_lists[idir][s][i] which = (s + 1, res.chain, res.resnum_orig, res.name_orig) if len(matches) == 0: num_errors = num_errors + 1 derror[res.name_orig] = derror.setdefault(res.name_orig, 0) + 1 if derror[res.name_orig] <= num_messages: formula = viparr_residue.residue_formula( [ct_atoms[s][ii].atomicnum for ii in res.nlist]) print( 'Error: Struct %d, Res <"%s",%d> (%s) not matched by any force field template' % which, 'with formula', formula) if derror[res.name_orig] == num_messages: print('Ignoring further errors for (%s)' % res.name_orig) elif len(matches) > 1: dwarn[res.name_orig] = dwarn.setdefault(res.name_orig, 0) + 1 if dwarn[res.name_orig] <= num_messages: print( 'Warning: Struct %d, Res <"%s",%d> (%s) matched by multiple force fields:' % which, matches) if dwarn[res.name_orig] == num_messages: print('Ignoring further warnings for (%s)' % res.name_orig) for idir, dir in enumerate(opts.forcefield): if all_residue_lists[idir][s][i].match == False: all_residue_lists[idir][s][i] = None # overwrite if num_errors > 0: sys.exit(1) ################################################################################### # Get the combining rules and function type and check that they are consistent # also check the forcefield versions for incompatability comb_rule = set() vdw_func = set() for dir in opts.forcefield: rules = parse_json(os.path.join(dir, 'rules')) minver = rules.get('min_viparr_version', viparr_version) if (list(map(int, minver.split('.'))) > list( map(int, viparr_version.split('.')))): raise AssertionError( 'Forcefield %s needs viparr version >= %s. Current version = %s' % (dir, minver, viparr_version)) x = rules.get('vdw_comb_rule', '').upper() if x == '': x = 'WATER' comb_rule.add(x) x = rules.get('vdw_func', 'LJ12_6_sig_epsilon') vdw_func.add(x) if len(vdw_func) != 1: print(vdw_func) raise AssertionError('Inconsistent vdW func rules among force fields') vdw_func = vdw_func.pop() if comb_rule == set(['WATER', 'ARITHMETIC/GEOMETRIC']) or \ comb_rule == set(['WATER', 'GEOMETRIC']): comb_rule.remove('WATER') if comb_rule == set(['WATER']): comb_rule = set(['ARITHMETIC/GEOMETRIC']) if len(comb_rule) != 1: print(comb_rule) raise AssertionError('Inconsistent vdW mixing rules among force fields') comb_rule = comb_rule.pop() ################################################################################### # Get the exclusion rules and check that they are consistent # This is NOT necessary! We already fail if a bond straddles 2 forcefields. # This prevents us from needing to worry about mixing 2 different exclusion rules # as exclusions/pair are based on topology... # #exclusion_rule = [] #for dir in opts.forcefield: # rules = parse_json(os.path.join(dir, 'rules')) # x = rules.get('exclusions', 4) # if( x not in exclusion_rule): # exclusion_rule.append(x) #if len(exclusion_rule) != 1: # print exclusion_rule # raise AssertionError, 'Inconsistent exclusion rules among force fields' #exclusion_rule = exclusion_rule[0] ###################################################################### # Main loop algorithm: # Loop over the structures in the ct # Loop over the force fields (some duplicate work here if multiple structures) # Loop over the plug-ins # Apply plug-in for s in structure_list: print('******Main loop: Structure', s + 1) strules = [] sttemplates = [] # length is num_residues in structure pseudo_residue_list = [[] for i in range(len(all_residue_lists[0][s]))] # We need all the pseudos in the structure now so we can get the site indexing correct for idir, dir in enumerate(opts.forcefield): print('Pre-processing force field <%s> for rules and pseudos' % dir) strules.append(parse_json(os.path.join(dir, 'rules'))) templates = {} tnames = [t for t in glob.glob(os.path.join(dir, 'templates*'))] for tmpl in tnames: viparr_merge.update_templates_strict(templates, tmpl) templates_convert(templates) # update pseudo_list update_pseudo_residue_list( idir, pseudo_residue_list, ct_atoms[s], templates, all_residue_lists[idir][s]) # before compression sttemplates.append(templates) tables = Struct() # the tables describing the chemical structure blocks = dict( ) # dictionary of blocks that will be written to the mae file info = list( ) # list of strings which are references for the force fields info.append(('Plugins: <%s>' % opts.plugindir, '')) for idir, dir in enumerate(opts.forcefield): print('Processing force field <%s>' % dir) tables.rules = strules[idir] tables.templates = sttemplates[idir] # construct lists that describe the chemical system for this force field lists = construct_lists(idir, ct_atoms[s], ct_bonds[s], tables.rules, tables.templates, all_residue_lists[idir][s], pseudo_residue_list) # so rules can be passed to plugins... # Viparr should take the position that rules are for internal use only. # This dosent stop viparr from passing certain of these values to plugins however (see construct_lists) # # lists.rules = tables.rules lists.comb_rule = comb_rule lists.vdw_func = vdw_func # so that ff directory is passed to plugins... lists.dir = dir # pass templates lists.templates = tables.templates # pass processed blocks lists.blocks = {} #info.append('FF dir: <%s>' % lists.dir) # force field directory #info.extend(lists.rules.get('info', ['No info'])) ffd = 'FF: <%s>' % lists.dir data = tables.rules.get('info', ['No info']) for d in data: info.append((ffd, d)) # loop over plug-ins pluglist = [p[0] for p in tables.rules['plugins']] if ('atoms' in pluglist and pluglist[-1] != 'atoms'): raise UserWarning( "atoms plugin should be last plugin applied. (Check rules file)" ) for plugin_setup in tables.rules['plugins']: plugin = plugin_setup[0] plugin_cfg = plugin_setup[1] if opts.verbose: print('Applying plugin <%s>' % plugin) context = {} ns = dict(context=context) # currently only read plugins from plugindir (or current dir if no plugindir) fname = os.path.join(opts.plugindir, plugin + '.py') with open(fname, 'rb') as f: exec(compile(f.read(), fname, 'exec'), ns) # define and 'declare' the plugin temp = context['callback'](lists, plugin_cfg) # for each component of temp, append to the corresponding local ff block for name in list(temp): if lists.blocks.get(name) != None: merge_blocks2(lists.blocks[name], temp[name]) lists.blocks[name] = temp[name] # for each component of lists.blocks, append to the corresponding global ff block for name in list(lists.blocks): if blocks.get(name) != None: merge_blocks2(blocks[name], lists.blocks[name]) blocks[name] = lists.blocks[name] print('Building mae data structure...') # put the terms in the maestro file del M[ s + 1].ffio_ff # delete any existing ffio block - could be very important! ff_handle = M[s + 1].__new_block__('ffio_ff') # add header if not opts.no_header: ffio_add_header(ff_handle, opts.ffio_name, info) # add vdw func type ff_handle.ffio_vdw_func = vdw_func # add combining rule ff_handle.ffio_comb_rule = comb_rule # add pseudo block ffio_add_pseudo_block(ff_handle, pseudo_residue_list) # add each block automatically in sorted order keys = list(blocks) keys.sort() for k in keys: # skip cmap if no torsion_torsion if k[:4] == 'cmap' and len(blocks.get('torsion_torsion', [])) == 0: continue if k == 'atoms': ffio_add_atoms_block(ff_handle, blocks.get(k)) else: try: ffio_add_block(ff_handle, 'ffio_' + k, blocks.get(k)) except: # noqa: E722 print("Failed while adding 'ffio_%s':" % (k)) for f in blocks.get(k): print(f) if opts.is_water: add_water_constraints(ff_handle) # write the file print('Writing maestro file <%s>' % maeff_outfile) if maeff_outfile.endswith('.mae.gz') or maeff_outfile.endswith('.maegz'): # unfortunately, the implementation of print and str functions are different # if we just use below statement, we will miss linefeed in the actual output. # import gzip # gzip.open(maeff_outfile, 'w').write(str(M)) # The solution here is to write to a temporary file and read it back before # compressing it. from tempfile import mkstemp handle, tmpfile = mkstemp() with open(tmpfile, 'w') as f: f.write(str(M)) import gzip with open(tmpfile) as f: gzip.open(maeff_outfile, 'w').write(f.read()) os.remove(tmpfile) else: with open(maeff_outfile, 'w') as f: f.write(str(M))
if __name__ == '__main__': main()