Source code for schrodinger.application.desmond.fep_util

from past.builtins import cmp

import schrodinger.utils.log as log
from schrodinger.application.desmond import constants

logger = log.get_output_logger(name="fep_util")
logger.setLevel(log.DEBUG)


[docs]def check1_subst_code(source_ct, dest_ct): """ take input cts and check single ring-atom/attachment subst_code, return true if pass """ def find_marked_atoms(ct): atoms_98 = [] atoms_99 = [] atoms_97 = [] for atom in ct.atom: if atom.property[constants.FEP_SUBST] == 98: atoms_98.append(atom) if atom.property[constants.FEP_SUBST] == 97: atoms_97.append(atom) if atom.property[constants.FEP_SUBST] == 99: atoms_99.append(atom) return (atoms_98, atoms_97, atoms_99) (source_98_atoms, source_97_atoms, source_99_atoms) = find_marked_atoms(source_ct) (dest_98_atoms, dest_97_atoms, dest_99_atoms) = find_marked_atoms(dest_ct) # print source_98_atoms, source_97_atoms, source_99_atoms if len(source_98_atoms) == 0 or len(dest_98_atoms) == 0: logger.error("No anchor atoms found") return False if len(source_99_atoms) == 0 and len(dest_99_atoms) == 0 and len( source_97_atoms) == 0 and len(dest_97_atoms) == 0: logger.error("No bridge atoms found") return False if len(source_97_atoms) != len(dest_97_atoms): logger.error( "Number of linker/ring atoms changed not the same in product and reatant" ) return False if (len(source_99_atoms) > 1 or len(dest_99_atoms) > 1 ) and (len(source_98_atoms) > 1 or len(dest_98_atoms)) > 1: logger.error("Too many attachment point detected") return False if len(source_97_atoms) == 1 or len(dest_97_atoms) == 1: if len(source_98_atoms) < 2 or len(dest_98_atoms) < 2: logger.error("Not enough connection for ring/linker atom") return False for atoms_98 in source_98_atoms: if atoms_98 not in source_97_atoms[0].bonded_atoms: logger.error( "anchor atom not bonded to ring/linker atom in original molecule" ) return False for atoms_98 in dest_98_atoms: if atoms_98 not in dest_97_atoms[0].bonded_atoms: logger.error( "anchor atom not bonded to ring/linker atom in mutated molecule" ) return False return True
[docs]def check_subst_code(source_ct, dest_ct): """ Take input cts and check multiple ring-atom/attachment subst_code, return true if pass :return: (error_code, msg) where error_code: - 0 no inconsistance detected - 1 too complicated, need manual inspection - -1 error """ def find_bridge_atoms(atom): """ input atom object return (98_atoms, 97_atoms) """ bridge_atoms = [] ring_atoms = [] #find all bridge atoms attached to anchor for a in atom.bonded_atoms: if a.property[constants.FEP_SUBST] % 100 == 98: bridge_atoms.append(atom) elif a.property[constants.FEP_SUBST] % 100 == 97: ring_atoms.append(atom) return (bridge_atoms, ring_atoms) def find_marked_atoms(ct, n=0): atoms_98 = [] atoms_99 = [] atoms_97 = [] for atom in ct.atom: if atom.property[constants.FEP_SUBST] == 98 + n * 100: atoms_98.append(atom) if atom.property[constants.FEP_SUBST] == 97 + n * 100: atoms_97.append(atom) if atom.property[constants.FEP_SUBST] == 99 + n * 100: atoms_99.append(atom) return (atoms_98, atoms_97, atoms_99) for i in range(20): (source_98_atoms, source_97_atoms, source_99_atoms) = find_marked_atoms(source_ct, i) (dest_98_atoms, dest_97_atoms, dest_99_atoms) = find_marked_atoms(dest_ct, i) # print source_98_atoms, source_97_atoms, source_99_atoms if i == 0: if len(source_98_atoms) == 0 or len(dest_98_atoms) == 0: logger.error("No anchor atoms found") return (-1, "No anchor atoms found, identic molecules?") else: if len(source_98_atoms) == 0 and len(dest_98_atoms) == 0: logger.info("No more anchor atoms found %d" % (i * 100 + 98)) continue # return (0, "pass") if len(source_99_atoms) == 0 and len(dest_99_atoms) == 0 and len( source_97_atoms) == 0 and len(dest_97_atoms) == 0: if len(source_98_atoms) > 1 or len(dest_98_atoms) > 1: logger.error("extra anchor atoms found source: %s dest: %s" % ([ (x.index, x.property[constants.FEP_SUBST]) for x in source_98_atoms ], [(x.index, x.property[constants.FEP_SUBST]) for x in dest_98_atoms])) return (-1, "extra anchor atoms found source: %s dest: %s" % ([ (x.index, x.property[constants.FEP_SUBST]) for x in source_98_atoms ], [(x.index, x.property[constants.FEP_SUBST]) for x in dest_98_atoms])) #check if there is a 97 atom connect to the anchor atom source_anchor = source_98_atoms[0] source_bridge_atoms = [] for atom in source_anchor.bonded_atoms: if atom.property[constants.FEP_SUBST] % 100 == 97: source_bridge_atoms.append(atom) dest_anchor = dest_98_atoms[0] dest_bridge_atoms = [] for atom in dest_anchor.bonded_atoms: if atom.property[constants.FEP_SUBST] % 100 == 97: dest_bridge_atoms.append(atom) if len(source_bridge_atoms) == 0 and len(dest_bridge_atoms) == 0: logger.error("No bridge atoms found") return (-1, "No bridge atoms found") if len(source_97_atoms) != len(dest_97_atoms): logger.error( "Number of linker/ring atoms changed not the same in product and reatant" ) return ( -1, "Number of linker/ring atoms changed not the same in product and reatant" ) if (len(source_99_atoms) > 1 or len(dest_99_atoms) > 1 ) and (len(source_98_atoms) > 1 or len(dest_98_atoms)) > 1: logger.error("Too many attachment point detected") return (-1, "Too many attachment point detected") source_bridge_atoms = [] source_anchor = source_98_atoms[0] #find all bridge atoms attached to anchor for atom in source_anchor.bonded_atoms: if atom.property[constants.FEP_SUBST] % 10 == 9: source_bridge_atoms.append(atom) if (len(source_bridge_atoms) > 1): # logger.error("source: %s attachments connected to the same anchor atom %d" % ([x.index for x in source_bridge_atoms], source_anchor.index)) # return (-1, "source: %s attachments connected to the same anchor atom %d" % ([x.index for x in source_bridge_atoms], source_anchor.index)) anchor_code = source_anchor.property[constants.FEP_SUBST] bridge_codes = [ a.property[constants.FEP_SUBST] for a in source_bridge_atoms ] ref_bg_codes = [ 99 - x * 10 for x in range(len(source_bridge_atoms)) ] ref_bg_codes.sort() bridge_codes.sort() if cmp(ref_bg_codes, bridge_codes) != 0: return (-1, "wrong codes on multiple changes on source atom %d" % source_anchor.index) dest_bridge_atoms = [] dest_anchor = dest_98_atoms[0] #find all bridge atoms attached to anchor for atom in dest_anchor.bonded_atoms: if atom.property[constants.FEP_SUBST] % 10 == 9: dest_bridge_atoms.append(atom) if (len(dest_bridge_atoms) > 1): # logger.error("dest: %s attachments connected to the same anchor atom %d" % ([(x.index, x.property[constants.FEP_SUBST]) for x in dest_bridge_atoms], dest_anchor.index)) # return (-1, "dest: %s attachments connected to the same anchor atom %d" % ([(x.index, x.property[constants.FEP_SUBST]) for x in dest_bridge_atoms], dest_anchor.index)) anchor_code = dest_anchor.property[constants.FEP_SUBST] bridge_codes = [ a.property[constants.FEP_SUBST] for a in dest_bridge_atoms ] ref_bg_codes = [99 - x * 10 for x in range(len(dest_bridge_atoms))] ref_bg_codes.sort() bridge_codes.sort() if cmp(ref_bg_codes, bridge_codes) != 0: return (-1, "wrong codes on multiple changes on dest atom %d" % source_anchor.index) if len(source_97_atoms) == 1 or len(dest_97_atoms) == 1: if len(source_98_atoms) < 2 or len(dest_98_atoms) < 2: # logger.error("Not enough connection for ring/linker atom") # return (-1, "Not enough connection for ring/linker atom") (source_bridge_atoms, source_ring_atoms) = find_bridge_atoms(source_97_atoms[0]) if len(source_bridge_atoms) + len(source_ring_atoms) < 2: #at least two bridge/linker/atom connected to ring/linker return ( -1, "not enough ring/linker atoms connected to source ring/linker atoms %d" % (source_anchor.index)) (dest_bridge_atoms, dest_ring_atoms) = find_bridge_atoms(dest_97_atoms[0]) if len(dest_bridge_atoms) + len(dest_ring_atoms) < 2: return ( -1, "not enough ring/linker atoms connected to dest ring/linker atoms %d" % (dest_anchor.index)) for atoms_98 in source_98_atoms: if atoms_98 not in source_97_atoms[0].bonded_atoms: logger.error( "anchor atom not bonded to ring/linker atom in original molecule" ) return ( -1, "source anchor atom not bonded to ring/linker atom in original molecule" ) for atoms_98 in dest_98_atoms: if atoms_98 not in dest_97_atoms[0].bonded_atoms: logger.error( "dest anchor atom not bonded to ring/linker atom in mutated molecule" ) return ( -1, "dest anchor atom not bonded to ring/linker atom in mutated molecule" ) elif len(source_97_atoms) > 1 or len(dest_97_atoms) > 1: for (source, dest) in zip(source_97_atoms, dest_97_atoms): (source_bridge_atoms, source_ring_atoms) = find_bridge_atoms(source) if len(source_bridge_atoms) + len(source_ring_atoms) < 2: #at least two bridge/linker/atom connected to ring/linker return ( -1, "not enough ring/linker atoms connected to source ring/linker atoms %d" % (source.index)) (dest_bridge_atoms, dest_ring_atoms) = find_bridge_atoms(dest) if len(dest_bridge_atoms) + len(dest_ring_atoms) < 2: return ( -1, "not enough ring/linker atoms connected to dest ring/linker atoms %d" % (dest.index)) logger.info( "multiple ring/linker atoms detected, need manual inspection") logger.info("source: %s" % [(x.index, x.property[constants.FEP_SUBST]) for x in source_97_atoms]) logger.info("dest: %s" % [(x.index, x.property[constants.FEP_SUBST]) for x in dest_97_atoms]) return ( 1, "multiple ring/linker atoms detected, need manual inspection\n source: %s\n dest: %s" % ([(x.index, x.property[constants.FEP_SUBST]) for x in source_97_atoms ], [(x.index, x.property[constants.FEP_SUBST]) for x in dest_97_atoms])) return (0, "pass")
[docs]def get_umbrella_mode_command_line(jobname, subhost, use_queue=1, num_gpu=0, num_cpu=1, num_windows=12): """ Given number of processors, return command line argument :param jobname: name of the job :param use_queue: is the job submitted to a queuing system or not :param subhost: host entry for computing slots :param num_gpu: number of gpu for each window :param num_cpu: number of cpu for each window :param num_windows: number of replicas/windows :return: list of string for command line argument """ ret_val = ["-JOBNAME", jobname, "-mode", "umbrella"] max_job = num_gpu num_slot = num_cpu if num_gpu: num_slot = num_gpu else: max_job = num_windows num_slot = num_cpu * num_windows if not use_queue and not num_gpu: max_job = 1 num_slot = num_cpu ret_val.append("-maxjob") ret_val.append("%d" % max_job) ret_val.append("-HOST") ret_val.append(subhost + ":%d" % num_slot) ret_val.append("-cpu") ret_val.append("%d" % num_cpu) return ret_val