Source code for schrodinger.application.ska

"""
Core routines for wrapping SKA backend.

This module provides direct access to the functionality used by the
top-level ska program and structalign utility.  The usual keyword-value
options in the top-level input file are supplied as a python dict.
Wrappers are supplied to provide PDB filenames as input as well as
maestro CTs which are transformed in place.

For higher level API, use the schrodinger.structutils.structalign module.

Copyright Schrodinger, LLC. All rights reserved.

"""

import operator
import os
import re
import shutil
import tempfile

import numpy

from schrodinger import structure
from schrodinger.infra import _ska
from schrodinger.infra import mm
from schrodinger.job.util import hunt
from schrodinger.structutils import analyze
from schrodinger.ui.sequencealignment import constants as seqdata
from schrodinger.utils.fileutils import force_remove

_true_val = re.compile(r'^(1|TRUE|YES)$', re.I)
_atom_rec = re.compile(
    r'^((ATOM  |HETATM).{24})([ -.\d]{8})([ -.\d]{8})([ -.\d]{8})(.*)$')
_winlen_vals = [2, 6, 9, 14, 17]
_trolltop = None

SKA_TRANSFORMATION_MATRIX = "s_psp_ska_transformation_matrix"


class _SkaData(object):
    """
    Data structure for named backend return values
        rotmat: list of (U,x) rotation matrices and translations
        align: aligned sequences as a python dict
        psd: backend PSD score
        rmsd: superposition RMSD
        stdout: raw output from the backend
        sse: dict of secondary structures
    """

    def __init__(self, rotmat, align, index, psd, rmsd, stdout, sse=None):
        self.rotmat = rotmat
        self.align = align
        self.index = index
        self.psd = psd
        self.rmsd = rmsd
        self.stdout = stdout
        self.sse = sse

    def setTransformationMatrixProperty(self, st, index=0):
        """
        Sets ct-level properties that correspond to the 3x3 rotation and 3x1
        translation in the transformation of this data structure. For SkaData
        with more than one alignment structure, an index can be provided to
        obtain a specific structure's transformation.

        :param st: structure to add properties to
        :type st: structure.Structure

        :param index: index of the transformation to extract
        :type index: int
        """
        if not self.rotmat or self.rotmat == [None]:
            return

        matrix = numpy.identity(4)
        rotation_matrix, translation_vector = self.rotmat[index]
        matrix[0:3, 0:3] = rotation_matrix
        matrix[0:3, 3] = translation_vector
        st.property[SKA_TRANSFORMATION_MATRIX] = ';'.join(map(str, matrix.flat))


[docs]def get_ska_transformation_matrix(st): """ Retrieves a transformation matrix from the given structure decorated with a ska alignment transformation. :return: 4x4 transformation matrix :rtype: numpy.ndarray """ if SKA_TRANSFORMATION_MATRIX not in st.property: return None matrix_str = st.property[SKA_TRANSFORMATION_MATRIX] return numpy.reshape(list(map(float, matrix_str.split(';'))), (4, 4))
[docs]def transform_structure(st, u, x): """ perform in-place transformation """ # perform in-place transformation st.setXYZ(numpy.inner(st.getXYZ(), u) + x) # FIXME Is there a way we can speed this up by doing more on the C side?? # Ev:125336 Transform alternate positions also: for atom in st.atom: alt_xyz = atom.alt_xyz if alt_xyz is not None: new_alt_xyz = numpy.inner(numpy.array(alt_xyz), u) + x atom.alt_xyz = new_alt_xyz
def _standardize(old_ct, rename, reorder): """ Return the standardized version of the input structure, without modifying the input CT. :type rename: bool :param rename: Whether to convert the non-standard residue names to standard forms where possible (e.g. HID -> HIS). :type reorder: bool :param reorder: Whether to re-order the residues by sequence. If both options are False, will return the original input structure, without making a copy. """ if not rename and not reorder: return old_ct if reorder: new_ct = structure.create_new_structure(0) mm.mmpdb_initialize(mm.error_handler) index_map = mm.mmpdb_get_sequence_order(old_ct.handle, 0) mm.mmpdb_terminate() mm.mmct_ct_reorder(new_ct, old_ct, index_map) else: # Re-naming but not re-ordering; make a copy: new_ct = old_ct.copy() if rename: for res in new_ct.residue: rname = res.pdbres[0:3] rcode = seqdata.AMINO_ACIDS_3_TO_1.get(rname, 'X') alias = seqdata.AMINO_ACIDS[rcode][0] if rname != alias: for at in res.atom: at.pdbres = alias return new_ct
[docs]def pairwise_align_ct(query, templist, keywords=None, log=None, debug=False, save_props=False, std_res=False, reorder=False, asl=None, asl_mobile=None): """ Wrapper for a series of pairwise ska jobs with CTs as input. NOTE: For higher-level API, use schrodinger.structutils.structalign module. :param query: fixed reference structure :type query: (seqname, ct) tuple :param templist: one or more structures to be aligned :type templist: list of (seqname, ct) tuples :param keywords: top-level keyword-value input options (other than QUERY_FILE and TEMPLATE) :type keywords: dict :param log: active logger for diagnostic messages :type log: logging.logger :param debug: debug flag passed to backend :type debug: bool :param save_props: True if output data included as CT properties :type save_props: bool :param std_res: True if residue names are translated to standard forms :type std_res: bool :param reorder: True if residues need to be reordered by connectivity :type reorder: bool :param asl: ASL for reference substructure to align :type asl: str :param asl_mobile: ASL for mobile substructures to align :type asl_mobile: str :return: list of alignments corresponding to templist :rtype: list of `_SkaData` """ # FIXME factor out common code with multiple_align_ct() if (keywords is None): keywords = {} qtag, qct = query q_backup_title = qct.title qct.title = qtag std_qct = _standardize(qct, std_res, reorder) if asl: align_atoms = analyze.evaluate_asl(std_qct, asl) if not align_atoms: st_str = "query structure: %s" % qtag raise ValueError("No atoms matched the ASL in the %s" % st_str) std_qct = std_qct.extract(align_atoms, copy_props=True) results = [] for tag, ct in templist: t_backup_title = ct.title ct.title = tag std_tct = _standardize(ct, std_res, reorder) if asl_mobile: align_atoms = analyze.evaluate_asl(std_tct, asl_mobile) if not align_atoms: st_str = "template structure: %s" % tag raise ValueError("No atoms matched the ASL in the %s" % st_str) std_tct = std_tct.extract(align_atoms, copy_props=True) try: output = run_align([std_qct, std_tct], keywords, log, debug) (u, x) = output.rotmat[0] # NOTE when -asl is used, "ct" still has the original full structure transform_structure(ct, u, x) # perform in-place transformation except Exception as err: raw_output = '\n\nWARNING: Skipping failed alignment: %s, %s\n' % ( qct.title, ct.title) raw_output += " Exception: %s\n\n" % err output = _SkaData([None], {}, {}, None, None, raw_output) if save_props: if output.psd: ct.property['r_psp_StructAlign_Score'] = output.psd if output.rmsd: ct.property['r_psp_StructAlign_RMSD'] = output.rmsd ct.property['s_psp_StructAlign_Reference'] = qtag output.setTransformationMatrixProperty(ct) results.append(output) ct.title = t_backup_title qct.title = q_backup_title return results
[docs]def multiple_align_ct(query, templist, keywords=None, log=None, debug=False, save_props=False, std_res=False, reorder=False): """ Wrapper for a single multiple-alignment ska job with CTs as input. NOTE: For higher-level API, use schrodinger.structutils.structalign module. Arguments are the same as in `pairwise_align_ct` above. :return: a single multiple alignment :rtype: `_SkaData` """ # TODO Add asl option # FIXME factor out common code with pairwise_align_ct() if (keywords is None): keywords = {} qtag, qct = query q_backup_title = qct.title qct.title = qtag std_qct = _standardize(qct, std_res, reorder) strucs = [std_qct] for tag, ct in templist: std_tct = _standardize(ct, std_res, reorder) std_tct.title = tag strucs.append(std_tct) output = run_align(strucs, keywords, log, debug) # carry out corresponding rotation for each template for i, ((u, x), (tag, ct)) in enumerate(zip(output.rotmat, templist)): transform_structure(ct, u, x) # perform in-place transformation if save_props: if output.psd: ct.property['r_psp_StructAlign_Score'] = output.psd if output.rmsd: ct.property['r_psp_StructAlign_RMSD'] = output.rmsd ct.property['s_psp_StructAlign_Reference'] = qtag output.setTransformationMatrixProperty(ct, i) qct.title = q_backup_title return output
[docs]def run_align(structs, keywords, log=None, debug=False): """ Core driver routine, equivalent to `$SCHRODINGER/ska` with all top-level input options specified in `keywords`. Returns alignment, scores, and rotation matrices as well as backend stdout. NOTE: For higher-level API, use schrodinger.structutils.structalign module. :param structs: structures to align. First in list is query; the rest of CTs will be aligned to it (templates). :type structs: list(schrodinger.structure.Structure) :param keywords: top-level keyword-value input options :type keywords: dict :param log: active logger for diagnostic messages :type log: logging.logger :param debug: debug flag passed to backend :type debug: bool :return: a single multiple alignment :rtype: `_SkaData` """ global _trolltop # Swap the titles of CTs with reasonable substitutes, if there are problems need_swap = False used_titles = set() for ct in structs: if (len(ct.title) >= 40 or " " in ct.title or ct.title in used_titles): need_swap = True used_titles.add(ct.title) if (need_swap): original_titles = dict() for i, ct in enumerate(structs): ct.property['s_psp_ska_orig_title'] = ct.title ct.title = "ct" + str(i) original_titles[ct.title] = ct.property['s_psp_ska_orig_title'] # Clean up input structures: eliminate chains which contain no amino acids # and would be invalid inputs to SKA aminoacid = [ "ALA ", "ARG ", "ASN ", "ASP ", "CYS ", "GLN ", "GLU ", "GLY ", "HIS ", "ILE ", "LEU ", "LYS ", "MET ", "PHE ", "PRO ", "SER ", "THR ", "TRP ", "TYR ", "VAL " ] # Make a list of CT indices and titles of strutures with no valid residues: poplist = [] poptitles = [] for i, CT in enumerate(structs): valid = False for res in CT.residue: if (aminoacid.__contains__(res.pdbres)): valid = True break if (not valid): poplist.append(i) poptitles.append(CT.title) if (i == 0): raise IOError("Query structure has no valid amino acid " + "residues.") # Remove structures that have no valid residues: npop = len(poplist) for i in range(npop - 1, -1, -1): structs.pop(poplist[i]) # for backwards compatibility even though no other formats # are implemented if keywords.get('ALIFORMAT', 'native') != 'native': raise RuntimeError('Unknown value specified for alignment format') if len(structs) < 2: raise RuntimeError('Query and template files not specified') # limitation imposed by use of unique chains in backend output if len(structs) > 52: raise RuntimeError('Maximum of 52 structures exceeded') strucfile = keywords.get('ALISTRUCS_OUTFILE', None) # provide a default output stream if none provided if log is None: from schrodinger.utils.log import get_logger log = get_logger() # extract template names (e.g. PDB codes) from CT titles tags = [ct.title for ct in structs] log.info('Using input structures: ' + ', '.join(tags)) auto = False if _true_val.match(keywords.get('USE_AUTOMATIC_SETTINGS', 'no')): if len(structs) > 2: log.warn('Automatic settings ignored for multiple alignment') else: log.info('Using automatic settings') auto = True if _trolltop is None: try: _trolltop = os.environ['TROLLTOP'] except KeyError: mmshare_data = hunt('mmshare', 'data') if not mmshare_data: raise RuntimeError('Unable to locate data files') _trolltop = os.path.join(mmshare_data, 'ska', 'allh.top') if os.path.exists(_trolltop): log.debug('Using topology file: ' + _trolltop) else: raise RuntimeError('Topology file %s appears to be missing' % _trolltop) options = {'top': _trolltop} if debug: options['verr'] = True # convert top-level keywords to corresponding backend command-line flags try: if 'PSD_THRESHOLD' in keywords: options['p'] = float(keywords['PSD_THRESHOLD']) if 'SWITCH' in keywords and _true_val.match(keywords['SWITCH']): options['switch'] = True if 'ORDER' in keywords and keywords['ORDER'] == 'seed': options['seed'] = True if 'CLEAN' in keywords and _true_val.match(keywords['CLEAN']): options['clean'] = True if 'RECKLESS' in keywords and _true_val.match(keywords['RECKLESS']): options['reckless'] = True runopts = [] if auto: # use defaults for everything other than '-w' options['i'] = 2.0 options['g'] = 1.0 options['s'] = 1.0 options['minlength'] = 2 for v in _winlen_vals: options['w'] = v runopts.append(dict(options)) # force a deep copy else: options['i'] = float(keywords.get('GAP_OPEN', 2.0)) options['g'] = float(keywords.get('GAP_DEL', 1.0)) options['s'] = float(keywords.get('SSE_MINSIM', 1.0)) options['minlength'] = int(keywords.get('SSE_MINLEN', 2)) # default is to not include '-w' flag on command-line if 'SSE_WINLEN' in keywords: options['w'] = int(keywords['SSE_WINLEN']) runopts.append(options) except ValueError as e: raise RuntimeError('Illegal value found in input file: %s' % e.args[0].split()[-1]) results = [] for options in runopts: # run backend executable for each run try: if strucfile: tmppdb = os.path.basename(tempfile.mktemp('.pdb', 'ska_', '.')) options['o'] = tmppdb else: tmppdb = None text = 'Running ska with arguments:' for key in sorted(list(options)): if key == 'top': # this is output in debug mode continue if options[key] is True: text += ' -%s' % key else: text += ' -%s %s' % (key, str(options[key])) log.info(text) # backend takes unordered list of CTs as input to multiple alignment # options correspond to command-line flags for stand-alone # executable # The backend appears to have problems if the numebr of characters # in the title is more than 39 characters. This appears to be # related to the fact that there code gets enacted if margin > 40 # in SequenceList::BuildOutput # # Make copies of the cts with standard titles structs_to_run = [ct.copy() for ct in structs] map_new_to_orig_title = {} for ict, ct in enumerate(structs_to_run): new_title = "Structure%08d" % ict map_new_to_orig_title[new_title] = ct.title ct.title = new_title ska_raw_output, ska_transf, ska_index = _ska.run_ska( [ct.handle for ct in structs_to_run], options) # Convert the outputs to look like what they would with original # titles (minus the crashes) transf = {} index = {} raw_output = ska_raw_output # Get the maximum length of the original titles to aid # in formatting the output max_title_len = max([ len(map_new_to_orig_title[k]) for k in list(map_new_to_orig_title) ]) max_orig_title_len = max( [len(k) for k in list(map_new_to_orig_title)]) for new_title, orig_title in map_new_to_orig_title.items(): if new_title in ska_transf: transf[orig_title] = ska_transf[new_title] if new_title in ska_index: index[orig_title] = ska_index[new_title] new_output = [] for line in raw_output.split("\n"): # Adjust the spacing of the ruler to match # the change in the sequence if line.startswith(" " * max_orig_title_len): line = line.replace(" " * max_orig_title_len, " " * max_title_len, 1) # Adjust the names of the sequences for new_title, orig_title in map_new_to_orig_title.items(): line = line.replace(new_title, ("%-" + str(max_title_len) + "s") % orig_title) new_output.append(line) raw_output = "\n".join(new_output) # Done with ct title manipulation code psd = None rmsd = None align = dict(list(zip(tags, [''] * len(tags)))) sse_map = dict(list(zip(tags, [''] * len(tags)))) sse_tags = {f'tc_sse:{tag}': tag for tag in tags} ready = False for line in raw_output.splitlines(): # in case of verbose output, scroll down to actual alignment if '..........+' in line: ready = True if not ready: continue field = line.split() if not field: continue first_word = field[0] if first_word in tags: align[first_word] += field[-1] elif first_word in sse_tags: sse_map[sse_tags[first_word]] += field[-1] continue try: if first_word == 'PSD:': psd = float(field[1]) elif first_word == 'RMSD:': rmsd = float(field[1]) except ValueError: raise RuntimeError( 'Error parsing alignment scores in output') # sanity check for aligned sequences, since arrays are # initialized for all template tags ali_len = 0 for s in align.values(): if ali_len: if len(s) != ali_len: raise RuntimeError( 'Error parsing sequence alignment in output') else: ali_len = len(s) for s in sse_map.values(): if len(s) != ali_len: raise RuntimeError( 'Error parsing secondary structure in output') try: log.info('\tPSD: %f, RMSD: %f' % (psd, rmsd)) except TypeError: log.info('\tNo alignment score available') results.append( (raw_output, transf, index, tmppdb, align, psd, rmsd, sse_map)) except SystemError: raise RuntimeError('Fatal error in ska backend') # pick best score if running multiple jobs with automatic settings if len(results) > 1: (raw_output, transf, index, tmppdb, align, psd, rmsd, sse_map) = sorted(results, key=operator.itemgetter(5))[0] log.debug('Selecting best result by PSD: %f' % psd) else: (raw_output, transf, index, tmppdb, align, psd, rmsd, sse_map) = results[0] # sanity check for rotation matrix data rotmat = [] try: refmat = numpy.array(transf[tags[0]]) except KeyError: refmat = None for tag in tags[1:]: try: mat = numpy.array(transf[tag]) if mat.shape != (4, 3): raise RuntimeError('Invalid rotation matrix for template: ' + tag) rotmat.append((mat[:3], mat[3])) except KeyError: if refmat is None: raise RuntimeError('No rotation matrix found for template: ' + tag) else: # this template was used as the reference for alignment rotmat.append((numpy.eye(3), numpy.zeros((3,)))) # transform to query frame if alignment was reordered if refmat is not None: refu = refmat[:3].transpose() refx = refmat[3] for i in range(len(rotmat)): u, x = rotmat[i] u = numpy.dot(refu, u) x = numpy.dot(refu, x - refx) rotmat[i] = (u, x) # only need to carry out rotations here if output structures requested # (for backwards compatibility, otherwise calling script can use returned # matrices directly) if strucfile: shutil.copy(tmppdb, strucfile) log.info('Raw structures written to file: ' + strucfile) temfiles = keywords.get('TEMPLATE', []) # Remove structures with invalid residues: # FIXME removing by titles won't work correctly in cases where same # title is used by multiple CTs - or even when the beginning of a title # is shared. poplist = [] i = 0 for tf in temfiles: for ttl in poptitles: if (tf.startswith(ttl)): poplist.append(i) i += 1 for i in range(len(poplist) - 1, -1, -1): temfiles.pop(poplist[i]) # filenames for modified structures rotfiles = ['rot-' + f for f in temfiles] # enumerate over templates using matching-length lists # (rotmat is a list of tuples) for r, f, (u, x) in zip(rotfiles, temfiles, rotmat): try: tfile = open(f, 'r') except: raise RuntimeError('Unable to read template file: ' + f) try: rfile = open(r, 'w') except: raise RuntimeError('Unable to create rotated file: ' + r) try: # exact copy of entire file (including ligands, etc) only # changing coordinates for line in tfile: data = _atom_rec.match(line) if data: try: # coordinates crds = numpy.array( list(map(float, data.group(3, 4, 5)))) # everything else in line pre, post = data.group(1, 6) crds = numpy.inner(crds, u) + x rfile.write(pre + '%8.3f%8.3f%8.3f' % tuple(crds.tolist()) + post + '\n') except TypeError: raise RuntimeError( 'Invalid data found in template file: ' + f) else: rfile.write(line) finally: rfile.close() tfile.close() log.debug('Creating rotated template file: ' + r) else: log.info('Rotated PDB files not generated') if strucfile and not debug: for r in results: force_remove(r[3]) # Replace the CT titles and delete the CT-level properties # used to store them, if there were any problems if (need_swap): for ct in structs: ct.title = ct.property['s_psp_ska_orig_title'] del ct.property['s_psp_ska_orig_title'] # Perform some surgery on _SkaData arguments to ensure that # the results can be parsed later. ska_out_disclaimer = "# Notice: the SKA output below utilizes a " + \ "different set of structure titles\n# " + \ "than were originally specified. These " + \ "changes were made because\n# the " + \ "titles contained either spaces or unusually " + \ "long names. To\n# ensure that SKA " + \ "could properly process the structures, the\n" + \ "# following name key was implemented:\n" for i, ct in enumerate(structs): align[ct.title] = align["ct" + str(i)] index[ct.title] = index["ct" + str(i)] del align["ct" + str(i)] del index["ct" + str(i)] ska_out_disclaimer += "# ct" + str(i) + ": " + ct.title + "\n" raw_output = ska_out_disclaimer + "\n" + raw_output # Get the original titles back to sse_map if the titles were swapped due to length limit or whitespace. new_sse_map = dict() for ct_sub_title in sse_map: new_sse_map[original_titles[ct_sub_title]] = sse_map[ct_sub_title] sse_map = new_sse_map # NOTE: rotated structures are not returned; but are instead written to # the CWD with the rot-* prefix if ALISTRUCS_OUTFILE was set. log.info('SKA calculation successfully completed') return _SkaData(rotmat, align, index, psd, rmsd, raw_output, sse_map)