Source code for schrodinger.protein.align

import contextlib
import enum
import itertools
from abc import ABCMeta
from abc import abstractmethod
from collections import defaultdict
from collections import namedtuple

import numpy as np
from Bio import pairwise2
from scipy.spatial import distance

from schrodinger import in_dev_env
from schrodinger import structure
from schrodinger.infra import mmerr
from schrodinger.protein import constants
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.tasks import clustal
from schrodinger.protein.tasks import sta
from schrodinger.protein.tasks import structure_alignment
from schrodinger.structutils import analyze
from schrodinger.utils import scollections

ASLResult = namedtuple("ASLResult", ["ref_ok", "other_ok", "other_skips"])


[docs]class CantAlignException(Exception): """ Exception raised when an aligner cannot start e.g. due to not enough seqs """
[docs]class AbstractAligner(object, metaclass=ABCMeta): """ Base class of objects that can perform an alignment """ def _extractGapLocations(self, seq_str): """ Convenience method to extract gap locations from a string of characters Useful for interacting with backends that return results as gapped sequences of strings """ return [ ind for (ind, char) in enumerate(seq_str) if char in constants.GAP_CHARS ]
[docs] @abstractmethod def run(self, aln): """ Aligns the sequences in an alignment using the parameters supplied on init Subclasses need to override this default implementation. :param aln: The alignment to align :type aln: `schrodinger.protein.alignment.BaseAlignment` """
def __call__(self, aln, *args, **kwargs): """ Convenience method to make the class instance itself a callable """ self.run(aln, *args, **kwargs)
[docs]class RescodeAligner(AbstractAligner): """ Aligns sequences by rescode """ def _fitSequenceToAllRescodes(self, seq, codes): current_residue_index = 0 gaps = [] for index, code in enumerate(codes): try: res = seq[current_residue_index] except IndexError: res = None if res is not None and res.is_res and res.rescode == code: current_residue_index += 1 else: gaps.append(index) return gaps
[docs] def run(self, aln): aln.removeAllGaps() all_codes = set() for seq in aln: all_codes.update(seq.annotations.rescode) all_codes.discard(None) sorted_codes = sorted(list(all_codes)) aln_gaps = [] for seq in aln: gaps = self._fitSequenceToAllRescodes(seq, sorted_codes) aln_gaps.append(gaps) aln.addGapsByIndices(aln_gaps)
class _TB(enum.IntEnum): """ An enum to keep track of the different pairwise matrices and the traceback pointers. """ Match, RefGap, OtherGap = range(3)
[docs]class AbstractPairwiseAligner(AbstractAligner): """ Abstract class for pairwise alignment where gaps can be merged into the entire alignment to preserve relative alignment of all non-reference sequences to the reference sequence. Subclasses must implement `_getPairwiseGaps` to align one sequence to the ref seq. Subclasses may override `_run` to customize aligning (e.g. validation or setup of additional data needed by `_getPairwiseGaps`) """
[docs] def __init__(self, preserve_reference_gaps=False): """ :param preserve_reference_gaps: Whether to preserve the gaps in the reference sequence. :type preserve_reference_gaps: bool """ self._preserve_reference_gaps = preserve_reference_gaps self._ref_seq = None self._other_seq = None self._aln = None self._gaps_orig = []
[docs] def run(self, aln, seqs_to_align=None, **kwargs): """ `kwargs` are additional arguments that will be passed to `_run`. :param aln: The alignment containing sequences to align. :type aln: alignment.Alignment :param seqs_to_align: The sequences in `aln` to align against the reference sequence of `aln`. If `None`, defaults to the first non-reference sequence in `aln` (ie `aln[1]`) :type seqs_to_align: list(sequence.Sequence) :raises CantAlignException: If `seqs_to_align` contains a sequence not found in `aln`. """ if len(aln) < 2: raise ValueError("Alignment must have at least two sequences to " f"align, got {len(aln)}.") self._aln = aln if seqs_to_align is None: seqs_to_align = aln[1:2] else: if any(seq not in aln for seq in seqs_to_align): raise CantAlignException( 'Sequence(s) in seqs_to_align not found in the alignment') self._ref_seq = aln.getReferenceSeq() self._run(aln, seqs_to_align, **kwargs) # Ensure that the alignment is rectangular after aligning aln.padAlignment()
def _getPairwiseGaps(self, other_seq): """ Subclasses must reimplement this method. :return: Gaps to insert into ref seq and `other_seq` to align them :rtype: tuple(list(int), list(int)) """ raise NotImplementedError def _run(self, aln, seqs_to_align, **kwargs): preserve_reference_gaps = self._preserve_reference_gaps for seq in seqs_to_align: self._setUpSequences(seq) ref_gaps, other_gaps = self._getPairwiseGaps(seq) self._mergeGaps(ref_gaps, other_gaps, preserve_reference_gaps) # When multiple sequences are selected for pairwise alignment, # after the first non-reference sequence is aligned to reference # sequence, the gaps in the reference sequence will be locked for # further alignments. preserve_reference_gaps = True def _setUpSequences(self, other_seq): """ Sanitizes sequence and initialize matrices in preparation for the alignment. :param other_seq: Sequence to be aligned the reference sequence. :type other_seq: sequence.Sequence """ # Keep a copy to merge original gaps into self._gaps_orig = [g.idx_in_seq for g in self._ref_seq.getGaps()] self._other_seq = other_seq # Sanitize sequences before aligning gaps = self._ref_seq.getGaps() + other_seq.getGaps() self._aln.removeElements(gaps) @staticmethod def _getMappedGaps(from_gaps, to_gaps): """ Utility function which calculates the offsets of gaps applied to previously-aligned sequences. :param from_gaps: Gap indices to apply to the new sequence. :type from_gaps: list(int) :param to_gaps: Existing gap indices of the targeted sequence :type to_gaps: list(int) :return: The indices of `from_gaps` offset to account for `to_gaps` :rtype: list(int) """ merged_gaps = list(from_gaps) # make a copy for i in range(len(merged_gaps)): for old_gap in to_gaps: if old_gap + i > merged_gaps[i]: break merged_gaps[i] += 1 # move the old gap forward return merged_gaps def _mergeGaps(self, ref_gaps, other_gaps, preserve_reference_gaps): """ Merge gaps into the original alignment. :param ref_gaps: Gaps to insert into `self._ref_seq` :param other_gaps: Gaps to insert into `self._other_seq` :param preserve_reference_gaps: Whether to preserve the gaps in the reference sequence.This is done to maintain the relative alignment between the reference sequence and all the other sequences. :type preserve_reference_gaps: bool """ ref_len = len(self._ref_seq) + len(ref_gaps) other_len = len(self._other_seq) + len(other_gaps) if other_len > ref_len: ref_gaps.extend(range(ref_len, other_len)) elif ref_len > other_len: other_gaps.extend(range(other_len, ref_len)) # Add original gaps displacement for reference sequence gaps ref_to_orig = self._getMappedGaps(ref_gaps, self._gaps_orig) orig_to_ref = self._getMappedGaps(self._gaps_orig, ref_gaps) other_idx = self._aln.index(self._other_seq) ref_idx = self._aln.index(self._ref_seq) gap_indices = [[] for s in self._aln] gap_indices[ref_idx] = self._gaps_orig # If we add the new other_gaps first, then we can get incorrect # behavior from terminal gaps. gap_indices[other_idx] = other_gaps self._aln.addGapsByIndices(gap_indices) gap_indices = [[] for s in self._aln] gap_indices[ref_idx] = ref_to_orig gap_indices[other_idx] = orig_to_ref self._aln.addGapsByIndices(gap_indices) if preserve_reference_gaps: gap_indices = [] for seq_idx, seq in enumerate(self._aln): cur_gap_indices = [] if seq_idx in (ref_idx, other_idx): gap_indices.append(cur_gap_indices) continue for gap_num, gap_idx in enumerate(ref_to_orig): if gap_idx > len(seq) + gap_num: # don't add gaps that are past the end of the sequence break cur_gap_indices.append(gap_idx) gap_indices.append(cur_gap_indices) self._aln.addGapsByIndices(gap_indices)
[docs]class AbstractNWPairwiseAligner(AbstractPairwiseAligner): """ Abstract class for the Needleman-Wunsch global alignment algorithm for pairwise sequence alignment with affine gap penalties. :cvar CONSTRAINT_SCORE: Reward amount for keeping constrained residues aligned :ctype CONSTRAINT_SCORE: float :cvar RES_MATCH_BONUS: Reward amount for aligning matching residues. Used by default if a substitution matrix is not specified. :ctype RES_MATCH_BONUS: float :cvar RES_MISMATCH_PENALTY: Penalty for aligning differing residues. Used by default if a subtitution matrix is not specified :ctype RES_MISMATCH_PENALTY: float """ CONSTRAINT_SCORE = 10000 RES_MATCH_BONUS = 1.0 RES_MISMATCH_PENALTY = 1.0
[docs] def __init__(self, preserve_reference_gaps=False, gap_open_penalty=1, gap_extend_penalty=0, sub_matrix=None, direct_scores=False, ss_constraints=False, penalize_end_gaps=True): """ :param preserve_reference_gaps: Whether to preserve the gaps in the reference sequence :type preserve_reference_gaps: bool :param gap_open_penalty: Penalty for opening a gap. Should be >=0. :type gap_open_penalty: float :param gap_extend_penalty: Penalty for extending a gap. Should be >=0. :type gap_extend_penalty: float :param sub_matrix: Scoring matrix to be used for the alignment. If no matrix is specified, this method uses residue identity measure. :type sub_matrix: 2D float array or dict mapping (char, char) to float :param direct_scores: Use scoring matrix directly as (NxM) where N, M are lengths of both sequences rather than default 20x20 substitution matrix. :type direct_scores: bool :param ss_constraints: Whether to constrain the alignment so no gaps appear in middle of a secondary structure. :type ss_constraints: bool :param penalize_end_gaps: Whether to penalize start/end gaps :type penalize_end_gaps: bool """ super().__init__(preserve_reference_gaps=preserve_reference_gaps) if gap_open_penalty < 0 or gap_extend_penalty < 0: raise CantAlignException('Penalty should be nonnegative') if gap_extend_penalty > gap_open_penalty: raise CantAlignException( 'Gap open penalty should be larger than gap extend penalty') # Store settings self._gap_open_penalty = gap_open_penalty self._gap_extend_penalty = gap_extend_penalty self._sub_matrix = sub_matrix self._sim_matrix = None self._direct_scores = direct_scores self._ss_constraints = ss_constraints self._penalize_end_gaps = penalize_end_gaps # Set up instance attributes self._anchored_pairs = defaultdict(list) self._constrained_pairs = {} self._constraints = []
def _run(self, aln, seqs_to_align, *, constraints=None): """ Set up constraints and suspend anchors before running the alignment :param constraints: Optional list of (ref_res, res) pairwise residue constraints. Note that these constraints will be heavily favored but are not guaranteed. Some constraints are impossible to respect simultaneously [e.g. residues at indexes (1,1) and (0,2)]. The first residue must belong to the reference sequence of the `aln`. :type constraints: list of (residue.Residue, residue.Residue) """ if not self._preserve_reference_gaps and len( self._aln.getAnchoredResidues()): raise CantAlignException( "Can't align an alignment with anchored residues without" "preserving gaps in the reference sequence.") if constraints is None: constraints = [] for res1, res2 in constraints: err_msg = ('Constraints must be a tuple of a reference residue ' 'and a non-reference residue, in that order') if res1.sequence != self._ref_seq: raise ValueError(err_msg) if res2.sequence not in aln: raise ValueError(err_msg) if res2.sequence == self._ref_seq: raise ValueError(err_msg) self._constraints = constraints self._initializeAnchorPairs(aln, seqs_to_align) with aln.suspendAnchors(): super()._run(aln, seqs_to_align, constraints=constraints) def _initializeAnchorPairs(self, aln, seqs_to_align): """ Initialize a dictionary mapping sequences to a list of anchor pairs. Anchor pairs are defined as tuples of (reference residue, anchored residue). """ _anchored_residues = defaultdict(list) for res in aln.getAnchoredResidues(): _anchored_residues[res.sequence].append(res) self._anchored_pairs = dict() for seq in seqs_to_align: anchors = _anchored_residues[seq] anchor_pairs = [(self._ref_seq[a.idx_in_seq], a) for a in anchors] self._anchored_pairs[seq] = anchor_pairs def _getMatrixValue(self, code1, code2): """ Returns the appropriate similarity score as defined by a similarity matrix. :param code1: short code of first sequence residue :type code1: char :param code2: short code of second sequence residue :type code2: char :return: value for a residue-residue pair based on a scoring matrix :rtype: float """ if self._sub_matrix is not None: value = self._sub_matrix[code1.upper(), code2.upper()] else: value = self.RES_MATCH_BONUS if code1 == code2 else -self.RES_MISMATCH_PENALTY return value
[docs]class SchrodingerPairwiseAligner(AbstractNWPairwiseAligner): """ Implementation of the Needleman-Wunsch global alignment algorithm for pairwise sequence alignment with affine gap penalties. 1) ability to merge new sequence with existing alignment, 2) ability to penalize gaps in secondary structure elements, 3) ability to use custom substitution matrix generated from a family of proteins or provided by the user. NOTE:: Any residues with variant residue types will have their short codes uppercased. This means they will be treated identically to their standard variant. If a nonstandard residue type has a lowercase short code that doesn't match its standard variant, or if we need special treatment for variant residues, _getMatrixValue will have to be changed. """
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self._sim_matrix = None self._scoring_matrix = None self._trace_matrix = None
def _setUpSequences(self, seq): super()._setUpSequences(seq) # Initialize matrices ref_len = len(self._ref_seq) other_len = len(self._other_seq) self._sim_matrix = np.zeros((ref_len, other_len)) mat_size = (ref_len + 1, other_len + 1, 3) self._scoring_matrix = np.zeros(mat_size) self._scoring_matrix[0, :, _TB.Match] = -float('inf') self._scoring_matrix[:, 0, _TB.Match] = -float('inf') self._scoring_matrix[:, 0, _TB.RefGap] = -float('inf') self._scoring_matrix[0, 1:, _TB.RefGap] = list(range(other_len)) self._scoring_matrix[0, 1:, _TB.RefGap] *= -self._gap_extend_penalty self._scoring_matrix[0, 1:, _TB.RefGap] -= self._gap_open_penalty self._scoring_matrix[0, :, _TB.OtherGap] = -float('inf') self._scoring_matrix[1:, 0, _TB.OtherGap] = list(range(ref_len)) self._scoring_matrix[1:, 0, _TB.OtherGap] *= -self._gap_extend_penalty self._scoring_matrix[1:, 0, _TB.OtherGap] -= self._gap_open_penalty self._scoring_matrix[0, 0, :] = 0 self._trace_matrix = np.full(mat_size, -1, np.int8)
[docs] def getAlignmentScore(self): """ Get the score of the alignment. Found by taking the highest value in the scoring matrix. :return: Score of the pairwise alignment. :rtype: float """ score = np.max(self._scoring_matrix[-1, -1, :]) # Offset the score for the constraint bonuses used to keep # constraints together. score -= ((len(self._constraints) + len(self._anchored_pairs[self._other_seq])) * self.CONSTRAINT_SCORE) return score
def _getPairwiseGaps(self, other_seq): """ Computes gaps to align a sequence against the reference sequence. The alignment is calculated using three matrices: R, M, and O. All have dimensions (n+1,m+1), where n is the number of residues in the reference sequence and m is the number of residues in ther "other" sequence. The values of the matrices represent:: R[i,j] - The score of the optimal alignment for the sequences ref_seq[:i] and other_seq[:j] with other_seq[j-1] aligned to a gap. M[i,j] - The score of the optimal alignment for the sequences ref_seq[:i] and other_seq[:j] with ref_seq[i-1] aligned to other_seq[j]. O[i,j] - The score of the optimal alignment for the sequences ref_seq[:i] and other_seq[:j] with ref_seq[i-1] aligned to a gap. The scores are calculated as:: R[i,j] = max( R[i-1,j] - gap_extend_penalty, M[i-1,j] - gap_open_penalty, O[i-1,j] - gap_open_penalty) M[i,j] = max( R[i-1,j-1] + score(ref_seq[i-1], other_seq[j-1]), M[i-1,j-1] + score(ref_seq[i-1], other_seq[j-1]), O[i-1,j-1] + score(ref_seq[i-1], other_seq[j-1])) O[i,j] = max( R[i-1,j] - gap_open_penalty, M[i-1,j] - gap_open_penalty, O[i-1,j] - gap_extend_penalty) `score()` is a function that typically gives a positive score if two residues are the same or similar, and negative if they're different. The function is defined based on the given substitution matrix and defaults to +1 for a match and -1 for a mismatch. :param other_seq: Sequence to align against the reference sequence :type other_seq: sequence.Sequence :raises CantAlignException: if `other_seq` is the reference sequence """ if other_seq == self._ref_seq: raise CantAlignException("Can't align sequence against itself") self._buildSimilarityMatrix() self._buildScoringMatrix() return self._traceBack() def _buildSimilarityMatrix(self): """ Calculate scores for all residue pairs in the sequence pair. """ sim_matrix = self._sim_matrix if self._sub_matrix is not None: if self._direct_scores: sim_matrix = self._sub_matrix else: codes1 = str(self._ref_seq).upper() codes2 = str(self._other_seq).upper() for (i, code1), (j, code2) in itertools.product( enumerate(codes1), enumerate(codes2)): sim_matrix[i, j] = self._getMatrixValue(code1, code2) else: same_idxs = ( [i, j] for (i, j) in itertools.product(range(len(self._ref_seq)), range(len(self._other_seq))) if str(self._ref_seq[i]).upper() == str( self._other_seq[j]).upper()) # Create row, column index lists for numpy array selection for i, j in same_idxs: sim_matrix[i, j] = self.RES_MATCH_BONUS + self.RES_MISMATCH_PENALTY sim_matrix -= self.RES_MISMATCH_PENALTY # Prevent pairwise constraints from separating anchor_pairs = self._anchored_pairs[self._other_seq] constraints = self._constraints + anchor_pairs for (res1, res2) in constraints: if res2.sequence != self._other_seq: continue sim_matrix[res1.idx_in_seq, res2.idx_in_seq] += self.CONSTRAINT_SCORE def _buildScoringMatrix(self): """ Iteratively builds the score matrix. See the docstring for `_getPairwiseGaps()` for information on how it's calculated. """ gap_open = self._gap_open_penalty gap_extend = self._gap_extend_penalty R = self._scoring_matrix[:, :, _TB.OtherGap] M = self._scoring_matrix[:, :, _TB.Match] O = self._scoring_matrix[:, :, _TB.RefGap] # noqa: E741 Rtb = self._trace_matrix[:, :, _TB.OtherGap] Mtb = self._trace_matrix[:, :, _TB.Match] Otb = self._trace_matrix[:, :, _TB.RefGap] sim_matrix = self._sim_matrix for (i, j) in itertools.product(range(1, len(self._ref_seq) + 1), range(1, len(self._other_seq) + 1)): # yapf: disable candidates = np.full((3,), -np.inf) candidates[_TB.OtherGap] = R[i - 1, j] - gap_extend candidates[_TB.Match] = M[i - 1, j] - gap_open candidates[_TB.RefGap] = O[i - 1, j] - gap_open R[i, j] = np.max(candidates) Rtb[i, j] = np.argmax(candidates) # Add an extra 0 to account for new subalignments starting at # this position (this is how we differentiate between global alignment) candidates = np.zeros((3,)) candidates[_TB.OtherGap] = R[i - 1, j - 1] + sim_matrix[i - 1, j - 1] # yapf: disable candidates[_TB.Match] = M[i - 1, j - 1] + sim_matrix[i - 1, j - 1] candidates[_TB.RefGap] = O[i - 1, j - 1] + sim_matrix[i - 1, j - 1] candidates[_TB.Match] += self._ssConstraintBonus(i, j) M[i, j] = np.max(candidates) Mtb[i, j] = np.argmax(candidates) candidates = np.full((3,), -np.inf) candidates[_TB.OtherGap] = R[i, j - 1] - gap_open candidates[_TB.Match] = M[i, j - 1] - gap_open candidates[_TB.RefGap] = O[i, j - 1] - gap_extend O[i, j] = np.max(candidates) Otb[i, j] = np.argmax(candidates) def _ssConstraintBonus(self, i, j): """ Determine whether the current residues in either sequence should get a score bonus for the matching matrix due to being part of a secondary structure :param i: The one-based index of the ref_seq to consider. :type i: int :param j: The one-based index of the other_seq to consider. :type j: int :return: 0 if we're not constraining secondary structures or neither residue is a part of a secondary structure. Otherwise, self.CONSTRAINT_SCORE :rtype: float """ if self._ss_constraints: helix_or_strand = {structure.SS_HELIX, structure.SS_STRAND} res1 = self._ref_seq[i - 1] res2 = self._other_seq[j - 1] if (res1.secondary_structure in helix_or_strand or res2.secondary_structure in helix_or_strand): return self.CONSTRAINT_SCORE return 0.0 def _traceBack(self): """ Traces back through highest alignment path and finds optimal gap positions. """ i, j = len(self._ref_seq), len(self._other_seq) k = np.argmax(self._scoring_matrix[-1, -1, :]) ref_gaps = [] other_gaps = [] while i != 0 and j != 0: tb = self._trace_matrix[i, j, k] if k == _TB.OtherGap: other_gaps.append(j) i -= 1 elif k == _TB.Match: i -= 1 j -= 1 elif k == _TB.RefGap: ref_gaps.append(i) j -= 1 else: raise RuntimeError('The value for k is undefined.') k = tb # Add starting gaps if abs(j - i) > 0: if j > i: ref_gaps.extend([0] * (j - i)) else: other_gaps.extend([0] * (i - j)) # Reverse gaps and convert from absolute indices to relative indices. # Absolute gap indices don't account for gaps that come before it # while relative gaps do. ref_gaps = [i + pos for (pos, i) in enumerate(reversed(ref_gaps))] other_gaps = [i + pos for (pos, i) in enumerate(reversed(other_gaps))] return ref_gaps, other_gaps
[docs]class BiopythonPairwiseAligner(AbstractNWPairwiseAligner): """ Pairwise alignment using Biopython. NOTE:: Any residues with variant residue types will have their short codes uppercased. This means they will be treated identically to their standard variant. If a nonstandard residue type has a lowercase short code that doesn't match its standard variant, or if we need special treatment for variant residues, _getMatrixValue will have to be changed. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self._sub_matrix is None: # If no sub matrix is defined, we just use an identity substitution # matrix using all short codes. ALL_RES_TYPES = (residue.AMINO_ACIDS + residue.NUCLEOBASES + residue.CAPPING_GROUPS) short_codes = [restype.short_code for restype in ALL_RES_TYPES] self._sub_matrix = {} for c1 in short_codes: for c2 in short_codes: if c1 == c2: value = self.RES_MATCH_BONUS else: value = -self.RES_MISMATCH_PENALTY self._sub_matrix[c1, c2] = value
[docs] def getAlignmentScore(self): """ Get the score of the alignment. Found by taking the highest value in the scoring matrix. :return: Score of the pairwise alignment. :rtype: float """ score = self._score # Offset the score for the constraint bonuses used to keep # constraints together. score -= ((len(self._constraints) + len(self._anchored_pairs[self._other_seq])) * self.CONSTRAINT_SCORE) return score
def _getMatrixValue(self, res1, res2): """ Returns the score for aligning `res1` and `res2`. These can either be characters or `residue.Residue`s. If they're `residue.Residue` objects, then we check if they're matching anchor residues and return a large score if they are. Otherwise, we just use their short-codes by calling `str(res).upper()`. WARNING:: This is called /A LOT/ by Biopython's aligner, so if any changes need to be made, make sure that performance is still reasonable. :param res1: A residue to calculate a score for :type res1: char or residue.Residue :param res2: A residue to calculate a score for :type res2: char or residue.Residue :return: alignment score :rtype: float """ try: return self._sub_matrix[res1, res2] except KeyError: try: if self._constrained_pairs[res1] == res2: return self.CONSTRAINT_SCORE except KeyError: pass return self._sub_matrix[str(res1).upper(), str(res2).upper()] def _transformSequenceForBiopython(self, seq): """ Takes a `sequence.Sequence` and returns a list or string to pass to Biopython's aligner. Subclasses may want to override this. For the BiopythonPairwiseAligner, residues are converted into their short codes unless they're anchor residues, in which case the actual `residue.Residue` object is used. This representation allows for fast computation in the usual case where `_getMatrixValue` is called with two non-anchor residues. Calling `_getMatrixValue` with an anchor residue will result in a key error since the the substitution matrix only has string keys. By using EAFP, we get to save on an attribute and instance check each time `_getMatrixValue` is called with non-anchor residues, which results in a large performance gain in the majority of cases. :param seq: a sequence to encode for Biopython :type seq: sequence.Sequence :return: a list of characters and residues :rtype: list(char or residue.Residue) """ return [ str(res).upper() if res not in self._constrained_pairs else res for res in seq ] def _getPairwiseGaps(self, other_seq): """ Computes gaps to align a sequence against the reference sequence. :param other_seq: Sequence to align against the reference sequence :type other_seq: sequence.Sequence :raises CantAlignException: if `other_seq` is the reference sequence """ if other_seq == self._ref_seq: raise CantAlignException("Can't align sequence against itself") ref_gaps = [] other_gaps = [] if len(self._ref_seq) != 0 and len(other_seq) != 0: self._constrained_pairs = {} anchor_pairs = self._anchored_pairs[self._other_seq] constraints = self._constraints + anchor_pairs for ref_res, other_res in constraints: self._constrained_pairs[ref_res] = other_res self._constrained_pairs[other_res] = ref_res ref_seq = self._transformSequenceForBiopython(self._ref_seq) other_seq = self._transformSequenceForBiopython(other_seq) alignments = pairwise2.align.globalcs( ref_seq, other_seq, self._getMatrixValue, -self._gap_open_penalty, -self._gap_extend_penalty, gap_char=['~'], one_alignment_only=True, penalize_end_gaps=self._penalize_end_gaps) aln_ref_seq_str, aln_other_seq_str, self._score, *_ = alignments[0] ref_gaps = self._extractGapLocations(aln_ref_seq_str) other_gaps = self._extractGapLocations(aln_other_seq_str) elif len(self._ref_seq) == 0 == len(other_seq): self._score = 0 elif len(self._ref_seq) == 0: self._score = -self._gap_open_penalty - ( len(other_seq) - 1) * self._gap_extend_penalty ref_gaps = list(range(len(other_seq))) elif len(other_seq) == 0: self._score = -self._gap_open_penalty - ( len(self._ref_seq) - 1) * self._gap_extend_penalty other_gaps = list(range(len(ref_gaps))) return ref_gaps, other_gaps
[docs]class PrimeSTAAligner(AbstractAligner): """ Sequence alignment using $SCHRODINGER/sta """
[docs] def __init__(self, protein_family=None): """ :param protein_family: 'GPCR' for specialized alignment or None for default templates. :type protein_family: str or NoneType """ if protein_family not in (None, 'GPCR'): raise ValueError(f"Invalid protein_family {protein_family}") self._protein_family = protein_family
[docs] def run(self, aln, structured_seq=None, constraints=None): """ :param aln: The alignment containing sequences to align. :type aln: alignment.Alignment :param structured_seq: Structured sequence to use as reference. If None, the first non-reference seq will be aligned. :type structured_seq: sequence.ProteinSequence or NoneType :param constraints: Pairs of (reference_seq, structured_seq) residues to constrain :type constraints: list(tuple(residue.Residue, residue.Residue)) or NoneType """ # TODO MSV-2728 allow aligning more than one query sequence if structured_seq is None: sta_ref_idx = 1 structured_seq = aln[sta_ref_idx] else: sta_ref_idx = aln.index(structured_seq) if not structured_seq.hasStructure(): raise CantAlignException( f'Sequence {sta_ref_idx + 1} ({structured_seq.fullname}) must ' 'have structure in order to align with STA') with _StructureOnlyAlignment(aln): query_seq = aln.getReferenceSeq() query_idx = aln.index(query_seq) task = sta.STATask() task.input.ref_seq = structured_seq task.input.query_seq = query_seq task.input.protein_family = self._protein_family if constraints is not None: task.input.constraints = constraints task.start() task.wait() # TODO PANEL-18317 if task.status is not task.DONE: if in_dev_env(): print("Exception type:", type(task.failure_info.exception)) print(task.failure_info.traceback) print(task.failure_info.message) print("STDOUT") print(task.stdout) print("STDERR") print(task.stderr) raise RuntimeError("STA failed") ref_gaps, query_gaps = task.getGaps() # Apply the gaps to the sequences: gap_indices = [[] for s in aln] gap_indices[sta_ref_idx] = ref_gaps gap_indices[query_idx] = query_gaps aln.addGapsByIndices(gap_indices) StructurelessGapAligner().run(aln, [query_seq, structured_seq]) aln.padAlignment()
[docs]class ClustalAligner(AbstractAligner): """ Aligns sequences using the Clustal alignment algorithm. """
[docs] def run(self, aln): """ Aligns the sequences in an alignment :param aln: The alignment to align :type aln: `schrodinger.protein.alignment.BaseAlignment` """ job = clustal.ClustalJob(aln) new_aln = job.run() if new_aln is not None: new_gaps = new_aln.getGaps() new_gap_indices = [ [g.idx_in_seq for g in seq_gaps] for seq_gaps in new_gaps ] aln.removeAllGaps() aln.addGapsByIndices(new_gap_indices)
[docs]class SuperpositionAligner(BiopythonPairwiseAligner): """ Align structured sequences based on their superposition. """
[docs] def __init__(self, gap_open_penalty=None, gap_extend_penalty=None): kwargs = dict(sub_matrix=None, direct_scores=True, preserve_reference_gaps=False, ss_constraints=False) if gap_open_penalty is not None: kwargs['gap_open_penalty'] = gap_open_penalty if gap_extend_penalty is not None: kwargs['gap_extend_penalty'] = gap_extend_penalty super().__init__(**kwargs) self._ref_ca_positions = None
def _getMatrixValue(self, res1, res2): """ Returns the score for aligning `res1` and `res2`. We reimplement this so we can just look up the score in our own custom sub matrix which has the direct scores between every pair of residues based on distance. This will also return a large bonus for aligning anchored residues. :param res1: A residue to calculate a score for :type res1: residue.Residue :param res2: A residue to calculate a score for :type res2: residue.Residue :return: alignment score :rtype: float """ try: if self._constrained_pairs[res1] == res2: return self.CONSTRAINT_SCORE except KeyError: pass return self._sub_matrix[res1, res2] def _transformSequenceForBiopython(self, seq): """ Encode the sequence for Biopython by just passing a list of the residues contained by the sequence. :param seq: a sequence to encode for Biopython :type seq: sequence.Sequence :return: a list of characters and residues :rtype: list(char or residue.Residue) """ return list(seq) def _run(self, aln, seqs_to_align, **kwargs): """ Check for structured sequences and precompute ref CA positions before aligning. """ ref_seq = aln.getReferenceSeq() if not ref_seq.hasStructure() or not all(seq.hasStructure() for seq in seqs_to_align): raise CantAlignException( 'Sequence(s) must have structures in order to ' 'align by superposition') if len(seqs_to_align) < 1: raise CantAlignException( "At least two structures must be present to run an alignment.") aln.removeAllGaps() self._ref_ca_positions = self._getCAPositionsFromSeq(self._ref_seq) super()._run(aln, seqs_to_align, **kwargs) def _getPairwiseGaps(self, other_seq): """ Align using a custom sub-matrix based on the CA positions of `other_seq` """ other_ca_positions = self._getCAPositionsFromSeq(other_seq) self._sub_matrix = self._getMatrixFromSuperposition( self._ref_ca_positions, other_ca_positions) return super()._getPairwiseGaps(other_seq) def _getMatrixFromSuperposition(self, ca_positions_1, ca_positions_2): """ Create scoring matrix based on residue distance. :param ca_positions_1: An iterable of CA positions with None for structureless residues :type ca_positions_1: list(tuple(float, float, float) or None) :param ca_positions_2: Another iterable of CA positions :type ca_positions_2: list(tuple(float, float, float) or None) :return: A scoring matrix :rtype: list(list(float)) """ score_matrix = {} # Build score matrix for (res1, ca_pos_1), (res2, ca_pos_2) in itertools.product( zip(self._ref_seq, ca_positions_1), zip(self._other_seq, ca_positions_2)): if ca_pos_1 is None or ca_pos_2 is None: dist2 = 999 # dummy "far" distance for missing coords else: dist2 = distance.sqeuclidean(ca_pos_1, ca_pos_2) # Convert close dist to high score and far dist to low score score = 20.0 / (1.0 + dist2 / 25.0) score_matrix[res1, res2] = score return score_matrix @classmethod def _getCAPositionsFromSeq(cls, seq): """ Get XYZ coordinates representing the sequence. :param seq: Sequence :type seq: sequence.ProteinSequence :return: A list of alpha carbon XYZ coordinates. If the residue does not have an alpha carbon, the first atom will be used. If the residue has no atoms, its coordinates will be None. :rtype: list(tuple(float, float, float) or None) """ struct_residues = (seq.getStructureResForRes(res) for res in seq) return [ cls._getCAPosition(struct_res) if struct_res is not None else struct_res for struct_res in struct_residues ] @staticmethod def _getCAPosition(struct_res): """ :type struct_res: schrodinger.structure._Residue """ atom = struct_res.getAlphaCarbon() if atom is None: atom = next(iter(struct_res.atom), None) if atom is not None: pos = (atom.x, atom.y, atom.z) return pos return None
[docs]class AbstractStructureAligner(AbstractAligner): """ Subclasses must reimplement `run`: - Call `_setUpSeqs` to set up instance attributes for the current alignment - Call `_setASLs` to validate and store ASLs - Call `_getUniqueEidSeqs` to get the sequences to align - Call `_runStructureAlignment` to call the backend """ Result = namedtuple("Result", ["ref_seq", "other_seq", "psd", "rmsd"])
[docs] def __init__(self, keywords=None, **kwargs): """ :param keywords: Keywords to pass to the ska backend :type keywords: dict """ super().__init__(**kwargs) if keywords is None: keywords = dict() self.keywords = keywords self._aln = None self._ref_asl = None self._other_asl = None self._unique_eid_seqs = None self._eid_name_dict = None self._align_results = None
[docs] def getResultSeqs(self): if self._align_results is None: return [] seq_pairs = [] ref_seq = self._aln.getReferenceSeq() ref_seq_name = ref_seq.name ref_eid = ref_seq.entry_id name_eid_dict = {name: eid for eid, name in self._eid_name_dict.items()} for name, result in self._align_results.items(): ref_seq = self._makeSequence(result.ref_string, result.ref_sse) ref_seq.name = ref_seq_name ref_seq.entry_id = ref_eid other_seq = self._makeSequence(result.other_string, result.other_sse) other_seq.name = structure_alignment.get_orig_seq_name(name) other_seq.entry_id = name_eid_dict[name] seq_pairs.append( self.Result(ref_seq, other_seq, result.psd, result.rmsd)) return seq_pairs
def _setUpSeqs(self, aln, seqs_to_align): self._aln = aln self._validateSeqsToAlign(seqs_to_align) self._unique_eid_seqs = self._getUniqueEidSeqs(aln.getReferenceSeq(), seqs_to_align) def _setASLs(self, ref_asl, other_asl): self._raiseForInvalidASLs(ref_asl, other_asl) self._ref_asl = ref_asl self._other_asl = other_asl def _raiseForInvalidASLs(self, ref_asl, other_asl): """ :param ref_asl: Reference ASL :type ref_asl: str or NoneType :param other_asl: Non-reference ASL :type other_asl: str or NoneType :raises CantAlignException: If either ASL is invalid """ messages = [] if not self._isValidASL(ref_asl): messages.append(f"Reference ASL is invalid: {ref_asl}") if not self._isValidASL(other_asl): messages.append(f"Non-reference ASL is invalid: {other_asl}") if messages: raise CantAlignException("\n".join(messages)) @staticmethod def _isValidASL(asl): """ :rtype: bool """ if asl is None: return True with mmerr.Capture(): return analyze.validate_asl(asl) @staticmethod def _makeSequence(seq_str, sse_str): seq = sequence.ProteinSequence(seq_str) for res, sse in zip(seq, sse_str): if res.is_gap: continue res.secondary_structure = constants.SSA_MAP[sse] return seq @staticmethod def _validateSeqsToAlign(seqs_to_align): if any(seq.entry_id is None for seq in seqs_to_align): msg = ("One or more sequence(s) has an undefined entry_id. This " "aligner can only be used with sequences imported by " "structure_model.") raise CantAlignException(msg) @staticmethod def _getUniqueEidSeqs(ref_seq, seqs_to_align): """ Get structured protein sequences with unique entry IDs :param ref_seq: Reference sequence :param seqs_to_align: Sequences to align :return: iterable[sequence.ProteinSequence] """ unique_eid_seqs = {ref_seq.entry_id: ref_seq} for seq in seqs_to_align: # TODO MSV-1504 use isinstance once NucleicAcidSequence does not # inherit ProteinSequence if (type(seq) not in (sequence.ProteinSequence, sequence.CombinedChainProteinSequence) or not seq.hasStructure()): continue entry_id = seq.entry_id if entry_id in unique_eid_seqs: continue unique_eid_seqs[entry_id] = seq return unique_eid_seqs.values() def _runStructureAlignment(self, seqs): """ :param seqs: Sequences with different structures (i.e. different entry_id) """ names_and_seqs = list(structure_alignment.get_unique_seq_names(seqs)) self._eid_name_dict = { seq.entry_id: name for name, seq in names_and_seqs } names_and_strucs = [ (name, seq.getStructure()) for name, seq in names_and_seqs ] result = structure_alignment.runStructureAlignment( names_and_strucs[0], names_and_strucs[1:], keywords=self.keywords, ref_asl=self._ref_asl, other_asl=self._other_asl) for (_, seq), (_, st) in zip(names_and_seqs, names_and_strucs): seq.setStructure(st) self._align_results = result
[docs]class StructureAligner(AbstractStructureAligner): """ Run structure alignment using the specified sequences to create chain ASLs """
[docs] def run(self, aln, seqs_to_align, **kwargs): self._setUpSeqs(aln, seqs_to_align) ref_seq = aln.getReferenceSeq() ref_eid = ref_seq.entry_id ref_chains = {ref_seq.structure_chain} other_asl_parts = defaultdict(set) for seq in seqs_to_align: eid = seq.entry_id chain = seq.structure_chain if eid == ref_eid: ref_chains.add(chain) else: other_asl_parts[eid].add(chain) ref_asl = self._makeASL(ref_eid, ref_chains) other_asl = " or ".join( self._makeASL(eid, chains) for eid, chains in other_asl_parts.items()) self._setASLs(ref_asl, other_asl) self._runStructureAlignment(self._unique_eid_seqs)
@staticmethod def _makeASL(eid, chains): """ Create an ASL for the given entry ID and one or more chains :param eid: Entry ID for the ASL :param chains: Entry chains for the ASL :return: The resulting ASL :rtype: str :raise ValueError: If the ASL is invalid """ asl = f'(entry.id {eid} and chain.name {",".join(chains)})' return asl
[docs]class CustomASLStructureAligner(AbstractStructureAligner): """ Run structure alignment using specified ASLs """ SENTINEL = object()
[docs] def __init__(self, keywords=None, ref_asl=None, other_asl=None): super().__init__(keywords=keywords) self._setASLs(ref_asl, other_asl) self._asl_result = None
[docs] def evaluateASLs(self, aln, seqs_to_align): """ Determine whether the ASLs match any atoms in the sequences' structures :param aln: Alignment :param seqs_to_align: Sequences to align :rtype: ASLResult """ if aln is not self.SENTINEL and seqs_to_align is not self.SENTINEL: # Internal call passes SENTINEL to indicate it already set up seqs self._setUpSeqs(aln, seqs_to_align) ref_seq = self._aln.getReferenceSeq() if self._ref_asl is None: ref_ok = True else: ref_atoms = analyze.evaluate_asl(ref_seq.getStructure(), self._ref_asl) ref_ok = bool(ref_atoms) other_skips = [] if self._other_asl is None: other_ok = True else: other_ok = False for seq in self._unique_eid_seqs: if seq == ref_seq: continue seq_atoms = analyze.evaluate_asl(seq.getStructure(), self._other_asl) if seq_atoms: other_ok = True else: other_skips.append(seq) result = ASLResult(ref_ok=ref_ok, other_ok=other_ok, other_skips=other_skips) self._asl_result = result return result
[docs] def run(self, aln, seqs_to_align, **kwargs): self._setUpSeqs(aln, seqs_to_align) if self._asl_result is None: # Don't evaluate ASLs again if evaluateASLs was called before # calling run result = self.evaluateASLs(self.SENTINEL, self.SENTINEL) if not (result.ref_ok and result.other_ok): # Reset result to force evaluation if run is called again self._asl_result = None raise CantAlignException("No atoms matched the ASL") seqs = [ seq for seq in self._unique_eid_seqs if seq not in self._asl_result.other_skips ] # Reset result to force evaluation if run is called again self._asl_result = None self._runStructureAlignment(seqs)
class _StructureOnlyAlignment(contextlib.AbstractContextManager): """ Context manager to temporarily remove all structureless residues from the alignment :cvar LAST_RES_REF: Sentinel object to represent residues at the end of a sequence :vartype LAST_RES_REF: object """ LAST_RES_REF = object() structureless_block = namedtuple("structureless_block", ["next_residue", "subsequence"]) def __init__(self, aln): self.aln = aln def __enter__(self): """ Remove and cache structureless residues """ res_to_remove = [] self.all_res_to_add = scollections.DefaultIdDict(list) for seq in self.aln: if not seq.hasStructure(): continue for structureless_block in self._getStructurelessBlocks(seq): structureless_subseq = structureless_block.subsequence # Store the residues to remove res_to_remove.extend(structureless_subseq) # Store the removed residues and a reference to re-add self.all_res_to_add[seq].append(structureless_block) self.aln.removeElements(res_to_remove) return self.aln def __exit__(self, exc_type, exc, exc_tb): """ Restore structureless residues """ for seq, res_to_add in self.all_res_to_add.items(): for structureless_block in res_to_add: next_res = structureless_block.next_residue residues = structureless_block.subsequence if next_res == self.LAST_RES_REF: # Blocks at the end of the sequence have no next res res_i = len(seq) else: res_i = next_res.idx_in_seq self.aln.addElements(seq, res_i, residues) def _getStructurelessBlocks(self, seq): """ Find continuous blocks of structureless residues and the next structured res, if any. :return: Iterable of (next structured res, structureless residues) :rtype: iterable(namedtuple(Residue, list(Residue))) """ structureless_subseq = [] for res in seq: if not res.hasStructure() and res.is_res: structureless_subseq.append(res) elif structureless_subseq: yield self.structureless_block(res, structureless_subseq) structureless_subseq = [] if structureless_subseq: # Blocks at the end of the sequence have no next res yield self.structureless_block(self.LAST_RES_REF, structureless_subseq)
[docs]class MaxIdentityAligner(BiopythonPairwiseAligner): """ Pairwise aligner that maximizes the number of matching residues between two sequences. There are no penalties for mismatches or gaps. """ class _IdentityDict(dict): """ Dictionary that takes 2-tuples and returns 1 if the elements of the tuple are the same and 0 otherwise. """ def __getitem__(self, key): assert isinstance(key, tuple) assert len(key) == 2 if key[0] == key[1]: return 1 else: return 0
[docs] def __init__(self): super().__init__(gap_open_penalty=0, gap_extend_penalty=0, sub_matrix=self._IdentityDict())
[docs] def run(self, aln): if len(aln) != 2: raise ValueError(f'{self.__class__.__name__} can only align pairs ' 'of sequences.') super().run(aln)
[docs]class StructurelessGapAligner(AbstractAligner): """ Align all structureless residues with gaps For example, given the following alignment (where circled letters are structureless residues): Resnum: 0 1 2 3 4 5 Seq1: Ⓐ Ⓡ Ⓒ A D E Seq2: Ⓒ Ⓐ Ⓝ A D A The result will be: Resnum: 0 1 2 3 4 5 6 7 8 Seq1: ~ ~ ~ Ⓐ Ⓡ Ⓒ A D E Seq2: Ⓒ Ⓐ Ⓝ ~ ~ ~ A D A """
[docs] def run(self, aln, seqs_to_align=None): if seqs_to_align is None: seqs_to_align = aln n_seqs = len(aln) col_idx = 0 while col_idx <= aln.num_columns: column = aln.getColumn(col_idx) structureless_seq_idx = None for seq_idx in reversed(range(n_seqs)): seq = aln[seq_idx] if not seq.hasStructure() or seq not in seqs_to_align: continue res = column[seq_idx] if res is not None and res.is_res and not res.hasStructure(): structureless_seq_idx = seq_idx break if structureless_seq_idx is not None: # Add a gap for this structureless res to all other seqs new_gaps = [[] for _ in aln] for s_idx, seq in enumerate(aln): if s_idx != structureless_seq_idx and col_idx < len(seq): new_gaps[s_idx].append(col_idx) aln.addGapsByIndices(new_gaps) col_idx += 1