#!/usr/bin/env desres-exec
#{
# exec desres-cleanenv -eUSER -eVIPARR_FFDIR -eVIPARR_PDIR \
# -m Python/2.5.1-09/bin \
# -m destro/0.4.7.1-D/lib-python \
# -m simplejson/1.7.4-1+Release \
# -- python $0 "$@"
#}
# viparr - force field parameter assignment program
# (Violet "Vi" Parr is the girl in The Incredibles who generates the force fields.)
import glob
import optparse
import platform
import six
import os
import sys
from schrodinger.application.desmond.packages.destro import Maeff
from .. import get_ffdir
from .. import get_pdir
from . import viparr_merge
from . import viparr_residue
from .version import viparr_version
from .viparr_plugins_util import parse_json
from .viparr_util import CtAtoms, CtBonds, ffio_add_atoms_block, \
ffio_add_block, ffio_add_header, ffio_add_pseudo_block
#import templates_check
if platform.system() == "Linux":
# Currently this causes a crash on Windows.
# The default precision appears to be 8 anyway...
from schrodinger.application.desmond.packages.destro import \
setglobalprecision
setglobalprecision(8)
[docs]class Struct(object):
pass
[docs]class Atom(object):
pass
[docs]def templates_convert(templates):
'''Convert templates structure to internal data structure.
'''
class Atom():
pass
for (name, res) in templates.items():
atoms = dict()
# atom structure is:
# [aname, atomic number, charge, [atypes]]
for a in res['atoms']:
atom = Atom()
atom.an = a[1]
atom.charge = a[2]
atom.atypes = a[3]
atoms[a[0]] = atom
templates[name]['atoms'] = atoms
[docs]def compress_residue_list(residue_list):
'''Compress FF representation if possible
'''
if len(residue_list) == 0:
return
if residue_list[0] == None:
return
first = residue_list[0].name
for res in residue_list:
if res == None:
break
if res.name != first:
break
else:
# natural termination of loop means all residues are the same
# now check the first residue to make sure all atom indices are contiguous
# if so, then this residue list should only contain one residue
if max(residue_list[0].nlist) + 1 == len(residue_list[0].nlist):
del residue_list[1:]
print('Compressing FF representation...')
[docs]def get_atom_list(atoms, neighbors, templates, residue_list):
'''Return a list of atoms in the ct structure. Each atom has the
following fields: pdbname, pdbres, resnum, atomic number, charge, atomtypes.
Also check that all residue and atom names are in the templates
data structure.
'''
class Atom(object):
pass
num_atoms = len(atoms)
# above is correct unless we have compressed the residue list
if len(residue_list) == 1:
if residue_list[0] != None:
num_atoms = len(residue_list[0].nlist)
atom_list = [None] * num_atoms
for ires, res in enumerate(residue_list):
if res != None:
# create marker to tell if atoms are polarizable
# Yuck... This needs to go...(its used in bondfields)
# and is specific to drude polarization
polist = set([
p[4]
for p in templates[res.name].get('pseudos', [])
if p[3].startswith("drude_")
])
for i, n in enumerate(res.nlist): # atom numbers are 0-based
a = Atom()
a.type = 'atom'
a.pdbname = res.alist[i]
a.pdbres = res.name
a.tmplname = res.name
a.resnum = ires + 1 # 1-based
a.neighbors = neighbors[n]
a.index = n
obj = templates[res.name]['atoms'][a.pdbname]
a.an = obj.an
a.charge = obj.charge
a.atypes = obj.atypes
if a.pdbname in polist:
a.polnotpol = True
else:
a.polnotpol = False
atom_list[n] = a
return atom_list
[docs]def get_neighbor_list(atom_list):
'''Return list of neighbor lists; one neighbor list for each atom, in order.
Each neighborlist is guaranteed to be sorted. Entries are 1-based.
'''
neighbor_list = list()
for a in atom_list:
if a == None:
neighbor_list.append([])
else:
neighbor_list.append(a.neighbors)
return neighbor_list
[docs]def get_bond_list(neighbor_list):
'''Return list of all bonds (i,j) such that i<j.
List will be sorted meaningfully. Entries are 1-based.
'''
bond_list = list()
bond_set = set()
num_atoms = len(neighbor_list)
for i in range(num_atoms):
for j in neighbor_list[i]:
if neighbor_list[j - 1] == []:
raise AssertionError(
'A bond (%d,%d) straddles your force fields' % (i + 1, j))
if i + 1 < j:
bond = (i + 1, j)
if bond not in bond_set:
bond_list.append(bond)
bond_set.add(bond)
return bond_list
[docs]def get_angle_list(neighbor_list):
'''Return list of all angles (i,j,k) such that i<k.
List will be sorted meaningfully. Entries are 1-based.
'''
angle_list = list()
num_atoms = len(neighbor_list)
for i in range(num_atoms):
b = neighbor_list[i] # assumes b[0]<b[1]<b[2]<b[3]
nb = len(b)
index = i + 1
if nb > 6:
six.reraise(AssertionError,
'Atom %d has more than six bonds:' % index, b)
for at0 in range(nb):
for at2 in range(at0 + 1, nb):
angle_list.append((b[at0], index, b[at2]))
angle_list.sort()
return angle_list
[docs]def get_proper_list(neighbor_list, bond_list):
'''Return list of all propers (i,j,k,l) such that i<l.
List will be sorted meaningfully. Entries are 1-based.
'''
proper_list = list()
for b in bond_list: # contains only one of (i,j) and (j,i)
# loop over possible (1,4)'s
for n0 in neighbor_list[b[0] - 1]:
if n0 == b[1]:
continue
for n1 in neighbor_list[b[1] - 1]:
if n1 == b[0]:
continue
if n1 == n0: # triangle
continue
if n0 < n1:
proper_list.append((n0, b[0], b[1], n1))
else:
proper_list.append((n1, b[1], b[0], n0))
proper_list.sort()
return proper_list
[docs]def get_bonded_term_list(fieldname, templates, residue_list, sorted):
'''Return a list of bonded items. Each bonded item is a list, namely:
[i,j] when fieldname = 'bonds' or 'exclusions'. if sorted, i<j
[i,j,k] when fieldname = 'angles'. if sorted, i<k
[i,j,k,l] when fieldname = 'propers' or 'impropers'. if sorted, i<l
The returned list will be sorted in a meaningful way.
'''
term_list = list()
for res in residue_list:
if res == None:
continue # ignore residue if templates does not know about it
name = res.name
terms = templates[name].get(fieldname, None)
if terms == None:
continue
for dih in terms:
nk = len(dih)
d = list()
for atom in dih:
a = res.atoms.get(atom)
if a == None:
raise AssertionError(
'In residue <"%s",%d> (%s), did not find atom %s' %
(res.chain, res.resnum_orig, res.name_orig, atom))
d.append(a + 1)
if len(d) == nk:
if d[0] > d[-1] and sorted:
d.reverse() # use sorted order i<j, i<k, i<l
term_list.append(tuple(d))
else:
print('Warning: some bonded terms were not found', nk)
term_list.sort()
return term_list
[docs]def merge(list0, list1):
'''Merge list1 into list0, as long as there are no entries in common.
'''
if list1 != None and list1 != []:
for p in list1:
if p in list0:
print(p, list1, list0)
raise AssertionError('Cannot merge lists; already in list')
else:
list0.append(p)
list0.sort()
[docs]def get_pseudo_terms(iffld, pseudo_residue_list, nres, nstart):
'''Return drude and virtual particle list (numbering starting at nstart) and list of 'bonds'
between drudes, virtual sites and particles. The pseudo_list can be appended to the atom list.
'''
if nres != 1 and nres != len(pseudo_residue_list):
raise AssertionError('Mismatched residue and pseudo residue lists')
pseudo_term_list = list()
pseudo_bond_list = list()
index = nstart - 1
for pres in pseudo_residue_list[:nres]:
for a in pres:
index += 1
if (iffld != a.ffldidx):
pseudo_term_list.append(None)
continue
if hasattr(a, 'index') and a.index != index - 1:
raise AssertionError('Bad Pseudo Indexes')
a.index = index - 1
pseudo_term_list.append(a)
# pseudo 'bonds'
pseudo_bond_list.append((a.partner_index, index))
if hasattr(a, 'partner_index2'):
pseudo_bond_list.append((a.partner_index2, index))
return pseudo_term_list, pseudo_bond_list
[docs]def update_pseudo_residue_list(iffld, pseudo_residue_list, atoms, templates,
residue_list):
'''pseudo_list is a list of all residues in ct block, and each residue is
a list of pseudo particles.
'''
for ires, res in enumerate(residue_list):
if res == None:
continue # ignore if do not recognize
# ignore if pseudo_list already has an entry for this residue
if len(pseudo_residue_list[ires]) != 0:
print('Warning: pseudo_list already has entry for residue')
continue
li = templates[res.name].get('pseudos', [])
for line in li:
nsite = line[4] #"primary site" of pseudo
site = res.atoms[nsite] # clone first atom in site definition
a = Atom()
a.ffldidx = iffld
a.type = 'pseudo'
a.pdbname = line[0]
a.chain = atoms[site].chain
a.resnum = atoms[site].resnum
a.pdbres = atoms[site].pdbres
a.tmplname = res.name
a.x = atoms[site].x
a.y = atoms[site].y
a.z = atoms[site].z
a.charge = line[1]
a.atypes = line[2]
a.field = line[3]
a.terms = line[4:]
a.partner_atom = nsite
a.partner_index = res.atoms[nsite] + 1
if a.field in ['virtual_midpoint']:
a.partner_index2 = res.atoms[line[5]] + 1
if hasattr(atoms[site], 'grp_thermostat'):
a.grp_thermostat = atoms[site].grp_thermostat
if hasattr(atoms[site], 'grp_energy'):
a.grp_energy = atoms[site].grp_energy
if (not (a.field.startswith("virtual_") or
a.field.startswith("drude_"))):
raise AssertionError('Unrecognized pseudo type: %s' %
(str(a.field)))
pseudo_residue_list[ires].append(a)
[docs]def construct_lists(iffld, atoms, neighbors, rules, templates, residue_list,
pseudo_residue_list):
'''Return lists that describe the structure.
'''
compress_residue_list(residue_list)
atom_list = get_atom_list(atoms, neighbors, templates, residue_list)
neighbor_list = get_neighbor_list(atom_list)
bond_list = get_bond_list(neighbor_list)
angle_list = get_angle_list(neighbor_list)
proper_list = get_proper_list(neighbor_list, bond_list)
# Keep input order for impropers (the order may be signigicant depending on how the coordinate is calculated)
improper_list = get_bonded_term_list('impropers', templates, residue_list,
False)
extra_angle_list = get_bonded_term_list('angles', templates, residue_list,
True)
extra_proper_list = get_bonded_term_list('propers', templates, residue_list,
True)
extra_excl_list = get_bonded_term_list('exclusions', templates,
residue_list, True)
# drude and virtual lists
pseudo_list, pseudo_bond_list = get_pseudo_terms(iffld, pseudo_residue_list,
len(residue_list),
len(atom_list) + 1)
atom_list.extend(pseudo_list)
merge(angle_list, extra_angle_list)
merge(proper_list, extra_proper_list)
# Setup for simultaneous generation of pairs and exclusions.
# default arguments in rules.get() are for backwards compatibility
excl_rule = rules.get('exclusions', 4)
if excl_rule < 1 or excl_rule > 4:
raise AssertionError(
"Invalid exclusion rule: %d. 'exclusions' must be between 1 and 4 inclusive"
% (excl_rule))
es_scale_list = rules.get('es_scale', [0.0] * (excl_rule - 1))
lj_scale_list = rules.get('lj_scale', [0.0] * (excl_rule - 1))
if (len(es_scale_list) != excl_rule - 1 or
len(lj_scale_list) != excl_rule - 1):
raise AssertionError(
"Invalid 'es_scale' (%d) or 'lj_scale' (%d) list lengths... Should be %d"
% (len(es_scale_list), len(lj_scale_list), excl_rule - 1))
# Keep track of (1,x) pairs separately while generating exclusions.
# Pair terms are only genertated up to and including the requested exclusion depth
# 1-2 pairs: particles separated by one bond and not in the extra exclusion list
# 1-3 pairs: particles separated by two bonds and not 1-{2 } pairs and not in the extra exclusion list
# 1-4 pairs: particles separated by three bonds and not 1-{2,3} pairs and not in the extra exclusion list
pair_list = []
excl_list = set(extra_excl_list)
if excl_rule >= 2:
tmp = set([(a[0], a[1]) for a in bond_list])
tmp.difference_update(excl_list)
pair_list.append(tmp)
excl_list.update(tmp)
if excl_rule >= 3:
tmp = set([(a[0], a[2]) for a in angle_list])
tmp.difference_update(excl_list)
pair_list.append(tmp)
excl_list.update(tmp)
if excl_rule >= 4:
tmp = set([(a[0], a[3]) for a in proper_list])
tmp.difference_update(excl_list)
pair_list.append(tmp)
excl_list.update(tmp)
excl_list = list(excl_list)
excl_list.sort()
for i, p in enumerate(pair_list):
p = list(p)
p.sort()
pair_list[i] = p
lists = Struct()
lists.atom_list = atom_list
lists.neighbor_list = neighbor_list
lists.residue_list = residue_list
lists.bond_list = bond_list
lists.angle_list = angle_list
lists.proper_list = proper_list
lists.improper_list = improper_list
lists.excl_list = excl_list
lists.pair_list = pair_list
lists.es_scale_list = es_scale_list
lists.lj_scale_list = lj_scale_list
lists.pseudo_list = pseudo_list
lists.pseudo_bond_list = pseudo_bond_list
# UNDONE: excl and pairs and cmap are lists of tuples, unlike the other lists
return lists
[docs]def merge_blocks(first, second):
'''Merge second block into first; print warning message if element in
second block is already in the first block.
Assumes blocks themselves hold unique entries.
First block is the merged one.
'''
firstset = set([tuple([tuple(f[0]), f[1]]) for f in first])
for e in second:
key = tuple([tuple(e[0]), e[1]])
if key in firstset:
print('Ignoring subsequent match:', e)
else:
first.append(e)
[docs]def merge_blocks2(first, second):
'''Merge first block into second; print warning message if element in
first block will override element in second block.
Assumes blocks themselves hold unique entries.
Second block is the merged one. [NOT THE SAME BEHAVIOR AS ABOVE.]
'''
seconddict = dict()
for i, e in enumerate(second):
t = tuple([tuple(e[0]), e[1]])
seconddict[t] = i
for f in first:
key = tuple([tuple(f[0]), f[1]])
if key in seconddict:
ind = seconddict[key]
print('Ignoring subsequent match:', second[ind])
second[ind] = f
else:
second.append(f)
[docs]def add_water_constraints(ff_handle):
"""
Add an ffio_constraints block to the ff_handle for a water ct by reading
the ffio_c1 values from the angles and bonds, and append
'_constrained' to the functional forms of the angles and bonds.
:param ff_handle: the handle of the ffio_block
:type ff_handle: destro.Destro
"""
# this will be a list of ffio_c1 values for [angle, bond1, bond2]
ffio_c1s = []
for block in [ff_handle.ffio_angles, ff_handle.ffio_bonds]:
for row in block:
funct = row.ffio_funct
if not funct.endswith('_constrained'):
row.ffio_funct = funct + '_constrained'
ffio_c1s.append(row.ffio_c1)
block = [[[1, 2, 3], 'HOH', ffio_c1s]]
del ff_handle.ffio_constraints
ffio_add_block(ff_handle, 'ffio_constraints', block)
################################################################################
[docs]def main(vargs=[]): # noqa: M511
usage = '''
%prog [options] mae_file maeff_outfile
Description:
viparr is a force field parameter assignment program.
-f and -d are used to specify force fields; the order of force fields
is important: earlier ones take precedence over later ones.
Simple example:
%prog -f charmm27 -f spc mae_file maeff_outfile
More complicated example:
%prog -d me/myfiles/myff -f charmm27 -f spc mae_file maeff_outfile
'''
def parser_add_directory(option, opt_str, value, parser, ffdir):
if ffdir == None:
raise AssertionError('VIPARR_FFDIR environment variable undefined; '+ \
'no built-in force fields can be specified')
parser.values.forcefield.append(os.path.join(ffdir, value))
def parser_merge(option, opt_str, value, parser):
if len(parser.values.forcefield) == 0:
raise AssertionError('Specify -f or -d before -m')
print('Merging <%s> and <%s>' % (parser.values.forcefield[-1], value))
destdir = viparr_merge.merge(parser.values.forcefield[-1], value)
print('Using temporary directory <%s>' % destdir)
parser.values.forcefield[-1] = destdir
ffdir = get_ffdir()
pdir = get_pdir()
if not ffdir:
usage += 'VIPARR_FF_PATH is empty: No built-in force fields'
else:
usage += 'Available built-in force fields:\n '
temp = os.listdir(ffdir)
temp.sort()
usage += '\n '.join(temp)
opt = optparse.OptionParser(usage=usage, version=viparr_version)
opt.set_defaults(ctnum=None)
opt.add_option('-c',
type='int',
dest='ctnum',
action="append",
help='''run viparr for selected CT blocks only, e.g., for the
first and third CT blocks, use "-c 1 -c 3"''')
opt.add_option('-f',
type='string',
dest='ffname',
action='callback',
callback=parser_add_directory,
callback_args=(ffdir,),
help='''built-in force field name; several can be listed,
each preceded by its own -f''')
opt.add_option('-d',
action='append',
dest='forcefield',
default=[],
help='''user-provided force field directory; several can
be listed, each preceded by its own -d''')
opt.add_option('-m',
type='string',
dest='merge_dir',
action='callback',
callback=parser_merge,
help='''path to user-defined force field directory that
is to be merged with previously specified force field;
several can be listed, each preceded by its own -m''')
opt.add_option('-n',
dest='ffio_name',
default='Generated by viparr',
help='force field name to put into output file')
opt.add_option('-p',
dest='plugindir',
default=pdir,
help='override for plugins directory (advanced usage)')
opt.add_option('-x',
action="store_true",
dest='no_header',
default=False,
help='do not print header info (for testing purposes)')
opt.add_option('-v',
action="store_true",
dest='verbose',
default=False,
help='verbose output')
opt.add_option(
'-w',
action="store_true",
dest='is_water',
default=False,
help='use this option when running viparr on a water ct, this '
'will recreate any constraints block that is removed')
if (vargs == []):
opts, args = opt.parse_args()
else:
opts, args = opt.parse_args(vargs)
print('viparr', viparr_version)
# check number of arguments first
if len(args) != 2:
opt.error('incorrect number of arguments: %d %s' %
(len(args), str(args)))
mae_file = args[0]
maeff_outfile = args[1]
# check that there is at least one forcefield
if len(opts.forcefield) < 1:
raise AssertionError('No force field specified')
# check that each directory exists
print('All FF directories, in order:\n', '\n'.join(opts.forcefield))
for dir in opts.forcefield:
if not os.path.isdir(dir):
raise AssertionError('Force field dir <%s> is not a directory' %
dir)
print('Plug-in directory:', opts.plugindir)
if opts.plugindir == None:
raise AssertionError('No plug-in directory specified with -p or with '+ \
'VIPARR_PDIR environment variable')
else:
if not os.path.isdir(opts.plugindir):
raise AssertionError('Plug-in dir <%s> is not a directory' %
opts.plugindir)
# open maestro file
print('Reading maestro file <%s> ...' % mae_file)
if mae_file.endswith('.mae.gz') or mae_file.endswith('.maegz'):
import gzip
mae_buffer = gzip.open(mae_file, 'rb').read()
M = Maeff(mae_buffer)
else:
with open(mae_file) as f:
M = Maeff(f)
print('Done reading maestro file')
num_structures = len(M)
# create structure list, which is 0-based list of ct numbers to process
if opts.ctnum == None:
structure_list = list(range(num_structures))
else:
structure_list = []
for e in opts.ctnum:
if e > num_structures or e < 1:
raise AssertionError('Bad value <%d> for selected CTNUM' % e)
structure_list.append(e - 1)
# skip structure if it is a "full_system"
for s in structure_list:
if M[s + 1].__contains__('ffio_ct_type'):
if M[s + 1].ffio_ct_type == 'full_system':
print('Skipping full_system structure <%d>' % (s + 1))
structure_list.remove(s)
if len(structure_list) == 0:
print('No CT blocks to process...exiting')
sys.exit(1)
# process the atoms and bonds in each ct
# this should be the only place that use CtBonds and CtAtoms
# ct_atoms and ct_bonds may have entries that are None
ct_atoms = [None] * num_structures
ct_bonds = [None] * num_structures
for s in structure_list:
print('Analyzing ct block %d' % (s + 1))
ct = M[s + 1]
ct_bonds[s] = CtBonds(ct)
ct_atoms[s] = CtAtoms(ct)
# residue list for each structure for each force field
all_residue_lists = [[[] for i in range(num_structures)] \
for j in range(len(opts.forcefield))]
######################################################################
# Loop for constructing the residue lists
# We do this as a pre-processing step so that we can check that all residues
# were matched by some template
for idir, dir in enumerate(opts.forcefield):
templates = {}
tnames = [t for t in glob.glob(os.path.join(dir, 'templates*'))]
for tmpl in tnames:
viparr_merge.update_templates_strict(templates, tmpl)
#templates_check.templates_check(templates)
viparr_residue.templates_reorder_atoms(templates)
# loop through all ct blocks
for s in structure_list:
print('******Constructing residue list for <%s> Structure %d' %
(dir, s + 1))
residue_list = viparr_residue.get_residue_list(
ct_atoms[s], ct_bonds[s], templates)
# save this residue list
all_residue_lists[idir][s] = residue_list
######################################################################
# Check that all residues were matched by some force field
# and print diagnostic messages.
# Also do the essential step of overwriting unmatched residues with 'None'
derror = dict()
dwarn = dict()
dmatch = dict()
num_messages = 5 # number of messages per residue name
num_errors = 0
for s in structure_list:
for i in range(len(all_residue_lists[0][s])):
matches = []
for idir, dir in enumerate(opts.forcefield):
res = all_residue_lists[idir][s][i]
if res.match:
matches.append(dir)
if res.name_orig != res.name:
pair = (res.name_orig, res.name)
dmatch[pair] = dmatch.setdefault(pair, 0) + 1
if dmatch[pair] <= num_messages:
print('Note: Struct %d, Res <"%s",%d> (%s) matched (%s)' % \
(s +1, res.chain, res.resnum_orig, res.name_orig, res.name))
if dmatch[pair] == num_messages:
print(
'Ignoring further messages for (%s) matching (%s)' %
pair)
if len(matches) != 1:
res = all_residue_lists[idir][s][i]
which = (s + 1, res.chain, res.resnum_orig, res.name_orig)
if len(matches) == 0:
num_errors = num_errors + 1
derror[res.name_orig] = derror.setdefault(res.name_orig,
0) + 1
if derror[res.name_orig] <= num_messages:
formula = viparr_residue.residue_formula(
[ct_atoms[s][ii].atomicnum for ii in res.nlist])
print(
'Error: Struct %d, Res <"%s",%d> (%s) not matched by any force field template'
% which, 'with formula', formula)
if derror[res.name_orig] == num_messages:
print('Ignoring further errors for (%s)' %
res.name_orig)
elif len(matches) > 1:
dwarn[res.name_orig] = dwarn.setdefault(res.name_orig,
0) + 1
if dwarn[res.name_orig] <= num_messages:
print(
'Warning: Struct %d, Res <"%s",%d> (%s) matched by multiple force fields:'
% which, matches)
if dwarn[res.name_orig] == num_messages:
print('Ignoring further warnings for (%s)' %
res.name_orig)
for idir, dir in enumerate(opts.forcefield):
if all_residue_lists[idir][s][i].match == False:
all_residue_lists[idir][s][i] = None # overwrite
if num_errors > 0:
sys.exit(1)
###################################################################################
# Get the combining rules and function type and check that they are consistent
# also check the forcefield versions for incompatability
comb_rule = set()
vdw_func = set()
for dir in opts.forcefield:
rules = parse_json(os.path.join(dir, 'rules'))
minver = rules.get('min_viparr_version', viparr_version)
if (list(map(int, minver.split('.'))) > list(
map(int, viparr_version.split('.')))):
raise AssertionError(
'Forcefield %s needs viparr version >= %s. Current version = %s'
% (dir, minver, viparr_version))
x = rules.get('vdw_comb_rule', '').upper()
if x == '':
x = 'WATER'
comb_rule.add(x)
x = rules.get('vdw_func', 'LJ12_6_sig_epsilon')
vdw_func.add(x)
if len(vdw_func) != 1:
print(vdw_func)
raise AssertionError('Inconsistent vdW func rules among force fields')
vdw_func = vdw_func.pop()
if comb_rule == set(['WATER', 'ARITHMETIC/GEOMETRIC']) or \
comb_rule == set(['WATER', 'GEOMETRIC']):
comb_rule.remove('WATER')
if comb_rule == set(['WATER']):
comb_rule = set(['ARITHMETIC/GEOMETRIC'])
if len(comb_rule) != 1:
print(comb_rule)
raise AssertionError('Inconsistent vdW mixing rules among force fields')
comb_rule = comb_rule.pop()
###################################################################################
# Get the exclusion rules and check that they are consistent
# This is NOT necessary! We already fail if a bond straddles 2 forcefields.
# This prevents us from needing to worry about mixing 2 different exclusion rules
# as exclusions/pair are based on topology...
#
#exclusion_rule = []
#for dir in opts.forcefield:
# rules = parse_json(os.path.join(dir, 'rules'))
# x = rules.get('exclusions', 4)
# if( x not in exclusion_rule):
# exclusion_rule.append(x)
#if len(exclusion_rule) != 1:
# print exclusion_rule
# raise AssertionError, 'Inconsistent exclusion rules among force fields'
#exclusion_rule = exclusion_rule[0]
######################################################################
# Main loop algorithm:
# Loop over the structures in the ct
# Loop over the force fields (some duplicate work here if multiple structures)
# Loop over the plug-ins
# Apply plug-in
for s in structure_list:
print('******Main loop: Structure', s + 1)
strules = []
sttemplates = []
# length is num_residues in structure
pseudo_residue_list = [[] for i in range(len(all_residue_lists[0][s]))]
# We need all the pseudos in the structure now so we can get the site indexing correct
for idir, dir in enumerate(opts.forcefield):
print('Pre-processing force field <%s> for rules and pseudos' % dir)
strules.append(parse_json(os.path.join(dir, 'rules')))
templates = {}
tnames = [t for t in glob.glob(os.path.join(dir, 'templates*'))]
for tmpl in tnames:
viparr_merge.update_templates_strict(templates, tmpl)
templates_convert(templates)
# update pseudo_list
update_pseudo_residue_list(
idir, pseudo_residue_list, ct_atoms[s], templates,
all_residue_lists[idir][s]) # before compression
sttemplates.append(templates)
tables = Struct() # the tables describing the chemical structure
blocks = dict(
) # dictionary of blocks that will be written to the mae file
info = list(
) # list of strings which are references for the force fields
info.append(('Plugins: <%s>' % opts.plugindir, ''))
for idir, dir in enumerate(opts.forcefield):
print('Processing force field <%s>' % dir)
tables.rules = strules[idir]
tables.templates = sttemplates[idir]
# construct lists that describe the chemical system for this force field
lists = construct_lists(idir, ct_atoms[s], ct_bonds[s],
tables.rules, tables.templates,
all_residue_lists[idir][s],
pseudo_residue_list)
# so rules can be passed to plugins...
# Viparr should take the position that rules are for internal use only.
# This dosent stop viparr from passing certain of these values to plugins however (see construct_lists)
#
# lists.rules = tables.rules
lists.comb_rule = comb_rule
lists.vdw_func = vdw_func
# so that ff directory is passed to plugins...
lists.dir = dir
# pass templates
lists.templates = tables.templates
# pass processed blocks
lists.blocks = {}
#info.append('FF dir: <%s>' % lists.dir) # force field directory
#info.extend(lists.rules.get('info', ['No info']))
ffd = 'FF: <%s>' % lists.dir
data = tables.rules.get('info', ['No info'])
for d in data:
info.append((ffd, d))
# loop over plug-ins
pluglist = [p[0] for p in tables.rules['plugins']]
if ('atoms' in pluglist and pluglist[-1] != 'atoms'):
raise UserWarning(
"atoms plugin should be last plugin applied. (Check rules file)"
)
for plugin_setup in tables.rules['plugins']:
plugin = plugin_setup[0]
plugin_cfg = plugin_setup[1]
if opts.verbose:
print('Applying plugin <%s>' % plugin)
context = {}
ns = dict(context=context)
# currently only read plugins from plugindir (or current dir if no plugindir)
fname = os.path.join(opts.plugindir, plugin + '.py')
with open(fname, 'rb') as f:
exec(compile(f.read(), fname, 'exec'),
ns) # define and 'declare' the plugin
temp = context['callback'](lists, plugin_cfg)
# for each component of temp, append to the corresponding local ff block
for name in list(temp):
if lists.blocks.get(name) != None:
merge_blocks2(lists.blocks[name], temp[name])
lists.blocks[name] = temp[name]
# for each component of lists.blocks, append to the corresponding global ff block
for name in list(lists.blocks):
if blocks.get(name) != None:
merge_blocks2(blocks[name], lists.blocks[name])
blocks[name] = lists.blocks[name]
print('Building mae data structure...')
# put the terms in the maestro file
del M[
s +
1].ffio_ff # delete any existing ffio block - could be very important!
ff_handle = M[s + 1].__new_block__('ffio_ff')
# add header
if not opts.no_header:
ffio_add_header(ff_handle, opts.ffio_name, info)
# add vdw func type
ff_handle.ffio_vdw_func = vdw_func
# add combining rule
ff_handle.ffio_comb_rule = comb_rule
# add pseudo block
ffio_add_pseudo_block(ff_handle, pseudo_residue_list)
# add each block automatically in sorted order
keys = list(blocks)
keys.sort()
for k in keys:
# skip cmap if no torsion_torsion
if k[:4] == 'cmap' and len(blocks.get('torsion_torsion', [])) == 0:
continue
if k == 'atoms':
ffio_add_atoms_block(ff_handle, blocks.get(k))
else:
try:
ffio_add_block(ff_handle, 'ffio_' + k, blocks.get(k))
except: # noqa: E722
print("Failed while adding 'ffio_%s':" % (k))
for f in blocks.get(k):
print(f)
if opts.is_water:
add_water_constraints(ff_handle)
# write the file
print('Writing maestro file <%s>' % maeff_outfile)
if maeff_outfile.endswith('.mae.gz') or maeff_outfile.endswith('.maegz'):
# unfortunately, the implementation of print and str functions are different
# if we just use below statement, we will miss linefeed in the actual output.
# import gzip
# gzip.open(maeff_outfile, 'w').write(str(M))
# The solution here is to write to a temporary file and read it back before
# compressing it.
from tempfile import mkstemp
handle, tmpfile = mkstemp()
with open(tmpfile, 'w') as f:
f.write(str(M))
import gzip
with open(tmpfile) as f:
gzip.open(maeff_outfile, 'w').write(f.read())
os.remove(tmpfile)
else:
with open(maeff_outfile, 'w') as f:
f.write(str(M))
if __name__ == '__main__':
main()