Source code for schrodinger.protein.alignment

"""
Classes for working with sequences containing alignment information (gaps) and
collections thereof.

Copyright Schrodinger, LLC. All rights reserved.
"""

import collections
import contextlib
import copy
import enum
import itertools
import types
from bisect import bisect

from schrodinger.application.msv import utils as msv_utils
from schrodinger.models import json
from schrodinger.protein import annotation
from schrodinger.protein import properties
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.Qt import QtCore

ResidueSimilarity = enum.Enum('ResidueSimilarity',
                              ['Identical', 'Similar', 'Dissimilar', 'NA'])


[docs]class AnchoredResidueError(RuntimeError): """ Exception to indicate that an action would break anchors. When possible, the specific anchors that block the action are stored on the exception instance. :ivar blocking_anchors: Anchors that block the action or `ALL_ANCHORS` if all anchors block the action. :vartype blocking_anchors: list[residue.Residue] or object """ ALL_ANCHORS = object()
[docs] def __init__(self, message=None, blocking_anchors=ALL_ANCHORS): """ :param message: Exception message :type message: str :param blocking_anchors: Anchored residues that block the action :type blocking_anchors: list[residue.Residue] """ super().__init__(message) self.blocking_anchors = blocking_anchors
[docs]class StructuredResidueError(RuntimeError): pass
[docs]class AlignmentSignals(QtCore.QObject): """ A collection of signals that can be emitted by an alignment :ivar domainsChanged: TODO :vartype domainsChanged: `QtCore.pyqtSignal` :ivar sequencesAboutToBeInserted: A signal emitted before sequences are inserted into the alignment. Emitted with: (The index of the first sequence to be inserted, The index of the last sequence to be inserted) :vartype sequencesAboutToBeInserted: `QtCore.pyqtSignal` :ivar sequencesInserted: A signal emitted after sequences are inserted into the alignment. Emitted with: (The index of the first sequence inserted, The index of the last sequence inserted) :vartype sequencesInserted: `QtCore.pyqtSignal` :ivar sequencesAboutToBeRemoved: A signal emitted before sequences are removed from the alignment. Emitted with: (The index of the first sequence to be removed, The index of the last sequence to be removed) :vartype sequencesAboutToBeRemoved: `QtCore.pyqtSignal` :ivar sequencesRemoved: A signal emitted after sequences are removed from the alignment. Emitted with: (The index of the first sequence removed, The index of the last sequence removed) :vartype sequencesRemoved: `QtCore.pyqtSignal` :ivar sequenceResiduesChanged: A signal emitted after the contents of a sequence have changed. Note that this signal may also be emitted in response to a sequence changing length, as positions in the alignment may switch from blank to occupied or vice versa. :vartype sequenceResiduesChanged: `QtCore.pyqtSignal` :ivar sequencesAboutToBeReordered: Signal emitted before reordering sequences :type: sequencesAboutToBeReordered: `QtCore.pyqtSignals` :ivar sequencesReordered: Signal emitted after sequences have been reordered :type: sequencesReordered: `QtCore.pyqtSignals` :ivar sequenceNameChanged: A signal emitted after a sequence has changed names. Emitted with: (The modified sequence) :vartype sequenceNameChanged: `QtCore.pyqtSignal` :ivar annotationTitleChanged: A signal emitted after a sequence's annotation has changed titles. Emitted with: (The sequence whose annotation title has been modified) :vartype annotationTitleChanged: `QtCore.pyqtSignal` :ivar alignmentNumColumnsAboutToChange: A signal emitted before the alignment changes length. Emitted with: (The current length of the alignment, The new length of the alignment) :vartype alignmentNumColumnsAboutToChange: `QtCore.pyqtSignal` :ivar alignmentNumColumnsChanged: A signal emitted after the alignment changes length. Emitted with: (The old length of the alignment, The current length of the alignment) :vartype alignmentNumColumnsChanged: `QtCore.pyqtSignal` :ivar residuesAboutToBeRemoved: A signal emitted before residues are to be removed. Emitted with a list of the residues to be removed. :vartype residuesAboutToBeRemoved: `QtCore.pyqtSignal` :ivar residuesRemoved: A signal emitted after residues are removed. This signal is not emitted with any parameters, but the residues that were removed were listed with the corresponding residuesAboutToBeRemoved signal. :vartype residuesRemoved: `QtCore.pyqtSignal` :ivar residuesAdded: A signal emitted with added residues. Note that this signal will be only be emitted once even if residues are added to multiple sequences. In addition, each individual sequence will emit a lengthChanged signal. :vartype residuesAdded: `QtCore.pyqtSignal` :ivar sequenceVisibilityChanged: A signal emitted when visibility of a sequence changes. Emitted with: (the sequence whose visibility is changing, the index of the sequence) :vartype sequenceVisibilityChanged: `QtCore.pyqtSignal` :ivar sequenceStructureChanged: A signal emitted when structure of a sequence changes. Emitted with: (the sequence whose visibility is changing, the index of the sequence) :vartype sequenceStructureChanged: `QtCore.pyqtSignal` :ivar alignmentAboutToBeCleared: A signal emitted just before all sequences are removed from the alignment. :vartype alignmentAboutToBeCleared: `QtCore.pyqtSignal` :ivar alignmentCleared: A signal emitted just after all sequences have been removed from the alignment. :vartype alignmentCleared: `QtCore.pyqtSignal` :ivar anchoredResiduesChanged: A signal emitted when one or more residues are anchored or unanchored. :vartype alignmentCleared: `QtCore.pyqtSignal` :ivar alnSetsChanged: A signal emitted when the alignment set for one or more sequences changes. :vartype alnSetsChanged: QtCore.pyqtSignal """ domainsChanged = QtCore.pyqtSignal() invalidatedDomains = QtCore.pyqtSignal() descriptorsCleared = QtCore.pyqtSignal() sequencesAboutToBeInserted = QtCore.pyqtSignal(int, int) sequencesInserted = QtCore.pyqtSignal(int, int) sequencesAboutToBeRemoved = QtCore.pyqtSignal(int, int) sequencesRemoved = QtCore.pyqtSignal(int, int) sequenceResiduesChanged = QtCore.pyqtSignal() sequencesAboutToBeReordered = QtCore.pyqtSignal() sequencesReordered = QtCore.pyqtSignal(list) sequenceNameChanged = QtCore.pyqtSignal(sequence.Sequence) annotationTitleChanged = QtCore.pyqtSignal(sequence.Sequence) alignmentNumColumnsAboutToChange = QtCore.pyqtSignal(int, int) alignmentNumColumnsChanged = QtCore.pyqtSignal(int, int) residuesAboutToBeRemoved = QtCore.pyqtSignal(list) residuesRemoved = QtCore.pyqtSignal() residuesAdded = QtCore.pyqtSignal(list) sequenceVisibilityChanged = QtCore.pyqtSignal(sequence.Sequence, int) sequenceStructureChanged = QtCore.pyqtSignal(sequence.Sequence, int) alignmentAboutToBeCleared = QtCore.pyqtSignal() alignmentCleared = QtCore.pyqtSignal() anchoredResiduesChanged = QtCore.pyqtSignal() alnSetChanged = QtCore.pyqtSignal() secondaryStructureChanged = QtCore.pyqtSignal() predictionsChanged = QtCore.pyqtSignal() pfamChanged = QtCore.pyqtSignal() kinaseFeaturesChanged = QtCore.pyqtSignal() kinaseConservationChanged = QtCore.pyqtSignal() @property def aln(self): """ Return the alignment that this signals object is reporting for. :rtype: BaseAlignment """ return self.parent()
[docs] @QtCore.pyqtSlot() def emitSeqResChanged(self): # We use self.sender() to retrieve the sequence instead of using a # functools.partial as a slot due to a memory leak (MSV-1538) self.sequenceResiduesChanged.emit()
[docs] @QtCore.pyqtSlot() def emitSeqNameChanged(self): # We use self.sender() to retrieve the sequence instead of using a # functools.partial as a slot due to a memory leak (MSV-1538) self.sequenceNameChanged.emit(self.sender())
[docs] @QtCore.pyqtSlot() def emitAnnTitleChanged(self): # We use self.sender() to retrieve the sequence instead of using a # functools.partial as a slot due to a memory leak (MSV-1538) self.annotationTitleChanged.emit(self.sender())
[docs] def allSignals(self): """ Iterate over all signals in this object in alphabetical order. :rtype: Iter(QtCore.pyqtBoundSignal) """ for signal, signal_name in self.allSignalsAndNames(): yield signal
[docs] def allSignalsAndNames(self): """ Iterate over all signals in this object and their names in alphabetical order. :rtype: Iter(tuple(QtCore.pyqtBoundSignal, str)) """ for attr_name in sorted(dir(self)): attr = getattr(self, attr_name) if isinstance(attr, QtCore.pyqtBoundSignal): yield attr, attr_name
[docs]class BaseAlignment(QtCore.QObject): """ Abstract base class for classes which handle alignment of various sequences and corresponding annotations. This is a pure domain object intended to make it easy to work with aligned collections of sequences. Some methods are decorated with @msv_utils.const in order to make it easy to write a wrapper for this class that supports undo/redo operations. :cvar _ALN_ANNOTATION_CLASS: The class for alignment annotations. This value should be overridden in subclasses. :vartype _ALN_ANNOTATION_CLASS: type :cvar _SEQ_ANNOTATION_CLASS: The class for sequence annotations. This value should be overridden in subclasses. :vartype _SEQ_ANNOTATION_CLASS: type """ _ALN_ANNOTATION_CLASS = None _SEQ_ANNOTATION_CLASS = None
[docs] def __init__(self, sequences=None): """ :param sequences: An optional iterable of sequences :type sequences: list """ super().__init__() # Count for number of times we've entered a suspendAnchors context. self._suspend_anchors_count = 0 self._modifying_structure = False self.signals = AlignmentSignals(self) self._sequences = [] self._anchored_residues = collections.defaultdict(set) self._name_to_set = {} self._seq_to_set = {} self._set_id_counter = 0 self._num_columns = 0 self._seq_positions_cache = {} self._col_same_res_cache = None self._res_similarity_cache = None self._ref_type_cache = None self._seqs_matching_ref_type_cache = None self._RO_anchored_residues = None # read-only set of anchored residues # read-only set of anchored residue plus the corresponding reference # residues self._RO_anchored_residues_with_ref = None if self._ALN_ANNOTATION_CLASS is None: self._annotations = None self._global_annotations = [] else: self._annotations = self._ALN_ANNOTATION_CLASS(self) self._global_annotations = \ self._ALN_ANNOTATION_CLASS.ANNOTATION_TYPES if self._SEQ_ANNOTATION_CLASS is None: self._seq_annotations = [] else: self._seq_annotations = self._SEQ_ANNOTATION_CLASS.ANNOTATION_TYPES self.signals.sequencesAboutToBeRemoved.connect( self._clearAnchorsFromSeqs) self.signals.anchoredResiduesChanged.connect(self.clearAllCaching) cache_invalidating_signals = [ self.signals.sequencesRemoved, self.signals.anchoredResiduesChanged, self.signals.sequencesReordered, self.signals.alignmentNumColumnsChanged, self.signals.alignmentNumColumnsAboutToChange, self.signals.sequenceResiduesChanged, self.signals.sequencesInserted, self.signals.residuesRemoved, self.signals.residuesAdded ] for signal in cache_invalidating_signals: signal.connect(self._resetCaches) for signal in (self.signals.sequencesRemoved, self.signals.sequencesInserted, self.signals.sequencesReordered): signal.connect(self._resetSeqTypeCache) if sequences is not None: if any(isinstance(seq, str) for seq in sequences): sequences = [ sequence.ProteinSequence(seq_str) if isinstance( seq_str, str) else seq_str for seq_str in sequences ] self.addSeqs(sequences)
############################################################################ # Magic methods ############################################################################
[docs] @msv_utils.const def __len__(self): """ Returns the number of sequences in the alignment """ return len(self._sequences)
@msv_utils.const def __iter__(self): """ Returns an iterable of the sequences held in the alignment """ return iter(self._sequences)
[docs] @msv_utils.const def __contains__(self, seq): """ Returns whether the sequence is present in the alignment """ return seq in self._sequences
@msv_utils.const def __getitem__(self, index): """ Returns the sequence at the index in the alignment """ try: return self._sequences[index] except IndexError: msg = ("index (%d) out of range (alignment length is only %d)" % (index, len(self))) raise IndexError(msg) # TODO MSV-1907: Move magic methods into one contiguous block. @msv_utils.const def _getTextFormattedAlignment(self): """ Returns a formatted list of strings with detailed alignment information """ element_pad = 3 v_header_pad = 15 # pad all the strings in the list by a given amount def pad_list(lst, amount): return [i.ljust(amount) for i in lst] seq_data = collections.OrderedDict() ruler = [str(i) for i in range(self._num_columns)] ruler = pad_list(ruler, element_pad) seq_data["Resnum:"] = ruler for num, seq in enumerate(self, 1): name = seq.name if not name: name = "Seq" + str(num) name += ":" elements = [] for res in seq: if res.is_gap: res_str = seq.gap_char else: res_str = str(res) if not res.hasStructure() and res.is_res: res_str = residue.box_letter(res_str) elements.append(res_str) elements = pad_list(elements, element_pad) seq_data[name] = elements to_format = [] # break every stride elements and start a new line. This keeps lines # about 75 characters long stride = 20 for i in range(0, self._num_columns, stride): if i != 0: to_format.append("\n...cont...\n") for title, rows in seq_data.items(): if len(rows) < self._num_columns: # prevent index errors padding_needed = self._num_columns - len(rows) rows += ["".ljust(element_pad)] * padding_needed line = title.ljust(v_header_pad) + "".join(rows[i:i + stride]) to_format.append(line) return to_format @msv_utils.const def _getAlignmentDescription(self): """ Returns a formatted description of the alignment """ seq_lengths = ", ".join([str(len(seq)) for seq in self]) class_name = self.__class__.__name__ num_seqs = str(len(self)) info = "{class_name} containing {num_seqs} sequences" if len(self) > 0: info += " (of respective lengths: {lengths})" info = info.format(class_name=class_name, num_seqs=num_seqs, lengths=seq_lengths) return info @msv_utils.const def __str__(self): """ Returns a str representation of the alignment This is fairly detailed since it is very useful for debugging """ to_format = self._getTextFormattedAlignment() info = self._getAlignmentDescription() return info + "\n\n" + "\n".join(to_format) def __repr__(self): """ :rtype: str :return: A str representation of the alignment """ return self._getRepr(type(self).__qualname__) def _getRepr(self, classname): aln_repr = [ classname, "([", ", ".join([repr(seq) for seq in self]), "])" ] return "".join(aln_repr) def __deepcopy__(self, memo): """ We should be able to copy an alignment, getting back an alignment with all the essential data in the original alignment and sharing no references with it """ sequences = copy.deepcopy(self._sequences, memo) Cls = self.__class__ aln = Cls(sequences) self._copyAnchoringTo(aln) self._copyAlnSetsTo(aln) return aln @msv_utils.const def _copyAnchoringTo(self, aln): """ Copy all anchored residues from this alignment to the given alignment. :param aln: The alignment to copy anchoring to. :type aln: BaseAlignment """ anchored_residues = self.getAnchoredResidues() new_anchored_res = self._mapResidues(anchored_residues, aln) aln.anchorResidues(new_anchored_res) @msv_utils.const def _copyAlnSetsTo(self, aln): """ Copy all alignment sets from this alignment to the given alignment. :param aln: The alignment to copy alignment sets to. :type aln: BaseAlignment """ for aln_set in self._name_to_set.values(): copied_seqs = [aln[self.index(seq)] for seq in aln_set] aln._addSeqsToAlnSetNoReordering(copied_seqs, aln_set.name) # make sure that the set ids match so that the colors are the same aln._name_to_set[aln_set.name].set_id = aln_set.set_id aln._set_id_counter = self._set_id_counter @msv_utils.const def _mapResidues(self, og_residues, new_alignment): """ Given residues and a new alignment, get the residues in the same position in the new alignment. """ return [ new_alignment[self.index(res.sequence)][res.idx_in_seq] for res in og_residues ] ############################################################################ # Annotations ############################################################################ @property def annotations(self): return self._annotations @property def global_annotations(self): """ Returns the alignment-level annotations available for the alignment """ return self._global_annotations @property def seq_annotations(self): """ Returns the sequence-level annotations available for sequences held in the alignment """ return self._seq_annotations
[docs] @msv_utils.const def getGlobalAnnotationData(self, index, annotation): """ Returns column-level annotation data at an index in the alignment :param index: The index in the alignment :type index: int :param annotation: An enum representing the requested annotation, if any :type annotation: `enum.Enum` """ annotation_name = annotation.name data_array = getattr(self.annotations, annotation_name) try: return data_array[index] except IndexError: return
############################################################################ # Sequences and signals ############################################################################ @property def num_columns(self): return self._num_columns
[docs] @msv_utils.const def getWorkspaceCounts(self): """ Summarize the visibility status of the alignment's sequences :returns: Counts of each type of visibility :rtype: collections.Counter """ return collections.Counter(seq.visibility for seq in self)
@QtCore.pyqtSlot() @msv_utils.const def _sequenceVisibilityChanged(self): """ Notify any listeners that a request to change visibility of an entire sequence has been made. """ # We use self.sender() to retrieve the sequence instead of using a # functools.partial as a slot due to a memory leak (MSV-1538) seq = self.sender() self.signals.sequenceVisibilityChanged.emit(seq, self.index(seq)) @QtCore.pyqtSlot() @msv_utils.const def _sequenceStructureChanged(self): """ Notify any listeners that the structure corresponding to a sequence has changed. """ # We use self.sender() to retrieve the sequence instead of using a # functools.partial as a slot due to a memory leak (MSV-1538) seq = self.sender() self.signals.sequenceStructureChanged.emit(seq, self.index(seq))
[docs] @msv_utils.const def index(self, seq): """ Returns the index of the specified sequence. :param seq: The requested sequence :type seq: `sequence.Sequence` :rtype: int :return: The index of the requested sequence """ return self._seq_positions[seq]
[docs] def reorderSequences(self, seq_indices): """ Reorder the sequences in the alignment using the specified list of indices. In the undoable version of this class, the private function is needed to perform the operation in an undoable operation. :param seq_indices: A list with the new indices for sequences :type: list of int :raises ValueError: In the event that the list of indices does not match the length of the alignment """ if len(seq_indices) != len(self): msg = ("The number of elements in seq_indices should match the " "number of sequences in the alignment") raise ValueError(msg) self.signals.sequencesAboutToBeReordered.emit() new_sequences = [self._sequences[i] for i in seq_indices] self._sequences = new_sequences self.signals.sequencesReordered.emit(seq_indices)
[docs] def sortByProperty(self, seq_prop, reverse=False): """ Sort the sequences by a sequence property. Sequences that do not have the sequence property defined will be grouped at the end of the alignment (regardless of `reverse`) :type seq_prop: properties.SequenceProperty """ sort_indices = self._getSortByPropertyIndices(seq_prop, reverse=reverse) self.reorderSequences(sort_indices)
def _getSortByPropertyIndices(self, seq_prop, reverse=False): def sort_key(seq): val = seq.getProperty(seq_prop) if val is None: val = float('inf') if reverse: val = -float('inf') return val return self._getSortIndices(key=sort_key, reverse=reverse)
[docs] def sort(self, *, key, reverse=False): """ Sort the alignment by the specified criteria. NOTE: Query sequence is not included in the sort. :param key: A function that takes a sequence and returns a value to sort by for each sequence. (required keyword-only argument) :type key: function :param reverse: Whether to sort in reverse (descending) order. :type reverse: bool """ sort_indices = self._getSortIndices(key, reverse=reverse) self.reorderSequences(sort_indices) return sort_indices
def _getSortIndices(self, key, reverse=False): ''' Helper method that gets the sequence indices for a sort. :param key: A function that returns a key to sort by. :type key: function :param reverse: Whether to sort in reverse (descending) order. :type order: bool :raises ValueError: If given something other than a SortBy enum. ''' # Extract key values and zip with indices if len(self) == 0: return [] else: # Skip index 0 since that's the query sequence key_values = [(key(self[i]), i) for i in range(1, len(self))] key_values.sort(key=lambda x: x[0], reverse=reverse) sort_indices = [0] + [kv[1] for kv in key_values] return sort_indices # TODO MSV-1910: Consider renaming length to numColumns? @msv_utils.const def _recalculateNumColumns(self, omit, extra_length=None): """ Determine what the length of the alignment would be if we removed sequences. :param omit: The sequences to omit :type omit: Iterable[sequence.Sequence] :param extra_length: An additional sequence length to include in the calculation :type extra_length: int :return: The calculated alignment length if the length is going to change. None otherwise. :rtype: int or NoneType """ omitted = [seq for seq in self._sequences if seq not in omit] if omitted: new_length = max(map(len, omitted)) else: new_length = 0 if extra_length is not None and extra_length > new_length: new_length = extra_length if new_length != self._num_columns: return new_length def _monitorSequence(self, seq): """ Monitor changes in the specified sequence. :param seq: The sequence to monitor :type seq: `sequence.Sequence` """ for signal, slot in self._getSeqSignalsAndSlots(seq): signal.connect(slot) def _stopMonitoringSequence(self, seq): """ Stop monitoring for changes in the specified sequence. :param seq: The sequence to stop monitoring :type seq: `sequence.Sequence` """ for signal, slot in self._getSeqSignalsAndSlots(seq): signal.disconnect(slot) def _getSeqSignalsAndSlots(self, seq): """ :return: pairs of (sequence signal, alignment slot) :rtype: tuple(tuple()) """ signals = self.signals return ((seq.residuesChanged, signals.emitSeqResChanged), (seq.annotations.domainsChanged, signals.domainsChanged), (seq.annotations.invalidatedDomains, signals.invalidatedDomains), (seq.descriptorsCleared, signals.descriptorsCleared), (seq.nameChanged, signals.emitSeqNameChanged), (seq.annotationTitleChanged, signals.emitAnnTitleChanged), (seq.lengthAboutToChange, self._sequenceLengthAboutToChange), (seq.lengthChanged, self._sequenceLengthChanged), (seq.visibilityChanged, self._sequenceVisibilityChanged), (seq.structureChanged, self._sequenceStructureChanged), (seq.secondaryStructureChanged, signals.secondaryStructureChanged), (seq.predictionsChanged, signals.predictionsChanged), (seq.kinaseFeaturesChanged, signals.kinaseFeaturesChanged), (seq.kinaseConservationChanged, signals.kinaseConservationChanged), (seq.pfamChanged, signals.pfamChanged)) # yapf: disable @QtCore.pyqtSlot(int, int) def _sequenceLengthAboutToChange(self, old_seq_length, new_seq_length): """ Respond to a sequence `lengthAboutToChange` signal by emitting `signals.alignmentNumColumnsAboutToChange` if necessary. :param old_seq_length: The current length of the sequence :type old_seq_length: int :param new_seq_length: The new length of the sequence :type new_seq_length: int """ # We use self.sender() to retrieve the sequence instead of using a # functools.partial as a slot due to a memory leak (MSV-1538) seq = self.sender() new_num_columns = self._checkAlignmentNumColumns( seq, old_seq_length, new_seq_length) if new_num_columns is not None: self.signals.alignmentNumColumnsAboutToChange.emit( self._num_columns, new_num_columns) @QtCore.pyqtSlot(int, int) def _sequenceLengthChanged(self, old_seq_length, new_seq_length): """ Respond to a sequence `lengthChanged` signal by emitting `signals.alignmentNumColumnsChanged` and `signals.sequenceResiduesChanged` as necessary. :param old_seq_length: The old length of the sequence :type old_seq_length: int :param new_seq_length: The current length of the sequence :type new_seq_length: int """ # We use self.sender() to retrieve the sequence instead of using a # functools.partial as a slot due to a memory leak (MSV-1538) seq = self.sender() new_num_columns = self._checkAlignmentNumColumns( seq, old_seq_length, new_seq_length) if new_num_columns is not None: old_num_columns = self._num_columns self._num_columns = new_num_columns self.signals.alignmentNumColumnsChanged.emit( old_num_columns, new_num_columns) if old_seq_length < new_seq_length: if old_seq_length < old_num_columns: self.signals.sequenceResiduesChanged.emit() else: if new_seq_length < new_num_columns: self.signals.sequenceResiduesChanged.emit() else: self.signals.sequenceResiduesChanged.emit() # TODO MSV_1907: Move closer to _recalculateNumColumns def _checkAlignmentNumColumns(self, seq, old_seq_length, new_seq_length): """ Determine what the length of the alignment will be if we changed the length of the specified sequence :param seq: The sequence that will change length :type seq: `sequence.Sequence` :param old_seq_length: The old length of the sequence :type old_seq_length: int :param new_seq_length: The new length of the sequence :type new_seq_length: int :return: If the alignment will change length, returns the new alignment length. Otherwise, returns None. :rtype: int or NoneType """ if new_seq_length > self._num_columns: return new_seq_length elif old_seq_length == self._num_columns: new_num_columns = self._recalculateNumColumns([seq], new_seq_length) return new_num_columns @msv_utils.const def _assertCanAddSeqs(self, index): if index is not None and index > len(self): raise ValueError("Cannot insert sequences at position past end")
[docs] def addSeq(self, seq, index=None): """ :param seq: The sequence to add :type seq: `sequence.Sequence` :param start: The index at which to insert; if None, seq is appended :type start: int """ self.addSeqs([seq], index)
[docs] def addSeqs(self, sequences, start=None): """ Add multiple sequences to the alignment :param sequences: Sequences to add :type sequences: list of `sequence.Sequence` :param start: The index at which to insert; if None, seqs are appended :type start: int """ if not sequences: return self._assertCanAddSeqs(start) # first, change the alignment length if necessary old_num_columns = self._num_columns seq_length = max(map(len, sequences)) if seq_length > old_num_columns: self.signals.alignmentNumColumnsAboutToChange.emit( old_num_columns, seq_length) self._num_columns = seq_length self.signals.alignmentNumColumnsChanged.emit( old_num_columns, seq_length) # then insert the sequences if start is None: start = len(self._sequences) last_new_index = start + len(sequences) - 1 self.signals.sequencesAboutToBeInserted.emit(start, last_new_index) for index, seq in enumerate(sequences, start=start): self._sequences.insert(index, seq) self._monitorSequence(seq) self.signals.sequencesInserted.emit(start, last_new_index)
[docs] def removeSeq(self, seq): """ Remove a sequence from the alignment :param seq: The sequence to remove :type seq: `sequence.Sequence` """ self.removeSeqs([seq])
[docs] def removeSeqs(self, seqs): """ Remove multiple sequences from the alignment """ seqs = set(seqs) if not seqs: return if self.getReferenceSeq() in seqs: self._assertCanRemoveRef() blocks, num_found = self._findSeqBlocks(seqs) if num_found != len(seqs): raise ValueError("Not all sequences present in alignment.") self._additionalSeqRemovalSteps(seqs) self._removeSeqsFromAlnSetNoReordering(seqs) self._changeNumColumnsForRemovedSeqs(seqs) self._removeSeqBlocks(blocks)
def _findSeqBlocks(self, seqs): """ Divide up the given sequences into blocks of contiguous sequences based on their positions in the alignment. :param seqs: The sequences to divide into blocks. :type seqs: set :return: A tuple of: - A list of (start index, end index) for blocks containing the specified sequences. This list is sorted in ascending order. - The number of sequences that were found in the alignment. If this is less than `len(seqs)`, then some of the input sequences are not present in the alignment. :rtype: tuple(list(tuple(int, int)), int) """ blocks = [] num_found = 0 block_start = None for i, seq in enumerate(self): if seq in seqs: num_found += 1 if block_start is None: block_start = i elif block_start is not None: blocks.append((block_start, i - 1)) block_start = None if block_start is not None: blocks.append((block_start, len(self) - 1)) return blocks, num_found def _additionalSeqRemovalSteps(self, seqs): """ The is a hook for subclasses to carry out any additional sequence removal operations. It will be called after we have confirmed that `seqs` are safe to remove from the alignment but before the sequences are actually removed. :param seqs: The sequences that will be removed from the alignment. :type seqs: set(sequence.Sequence) """ pass def _changeNumColumnsForRemovedSeqs(self, seqs_to_remove): """ Update the number of columns in the alignment in preparation for removing the specified sequences. :param seqs_to_remove: The sequences that will be removed from the alignment. :type seqs_to_remove: set(sequence.Sequence) """ new_num_columns = self._recalculateNumColumns(seqs_to_remove) if new_num_columns is None: return self.signals.alignmentNumColumnsAboutToChange.emit( self._num_columns, new_num_columns) old_num_columns = self._num_columns self._num_columns = new_num_columns self.signals.alignmentNumColumnsChanged.emit(old_num_columns, new_num_columns) def _removeSeqBlocks(self, blocks): """ Remove sequences at the specified indices in the alignment. :param blocks: A list of (start index, end index) for blocks to remove. This list must be sorted in ascending order. :type blocks: list[tuple(int, int)] """ for start, end in reversed(blocks): self.signals.sequencesAboutToBeRemoved.emit(start, end) for seq in self._sequences[start:end + 1]: self._stopMonitoringSequence(seq) del self._sequences[start:end + 1] self.signals.sequencesRemoved.emit(start, end) @msv_utils.const def _assertCanRemoveRef(self): """ If any anchored residues are present, raise an exception stating that we can't remove the reference sequence. """ if self.getAnchoredResidues(): raise AnchoredResidueError("Can't remove reference sequence while " "there are anchored residues") @QtCore.pyqtSlot() def _resetCaches(self): """ Utility function to simultaneously reset caches. """ self._seq_positions_cache.clear() self._col_same_res_cache = None self._res_similarity_cache = None self._RO_anchored_residues = None self._RO_anchored_residues_with_ref = None self.annotations.clearAllCaching() @QtCore.pyqtSlot() def _resetSeqTypeCache(self): self._ref_type_cache = None self._seqs_matching_ref_type_cache = None
[docs] def clear(self): """ Clears the entire alignment of sequences """ if len(self) == 0: return self.signals.alignmentAboutToBeCleared.emit() old_num_columns = self._num_columns self.signals.alignmentNumColumnsAboutToChange.emit(old_num_columns, 0) old_num_sequences = len(self._sequences) - 1 self.signals.sequencesAboutToBeRemoved.emit(0, old_num_sequences) for seq in self._sequences: self._stopMonitoringSequence(seq) self._sequences = [] self.signals.sequencesRemoved.emit(0, old_num_sequences) if old_num_columns > 0: self._num_columns = 0 self.signals.alignmentNumColumnsChanged.emit(old_num_columns, 0) self.signals.alignmentCleared.emit()
@property def _seq_positions(self): if not self._seq_positions_cache: self._updateSequencePositionCache() return self._seq_positions_cache def _updateSequencePositionCache(self): """ Update the cache of sequences to their index within the sequence. The sequence-index cache is stored as a mapping between sequences and their indices. """ self._seq_positions_cache = {seq: i for i, seq in enumerate(self)} ############################################################################ # Operations on Sequences ############################################################################ @msv_utils.const def _getReferenceSeqReordering(self, seq): """ Returns an ordering that will make the specified sequence the reference sequence :param seq: Sequence to set as reference sequence :type seq: `sequence` :rtype: list of int :return: A list of indices """ seq_index = self.index(seq) new_seq_ordering = self._getAlnSetSeqIdxs(seq) to_skip = set(new_seq_ordering) new_seq_ordering.extend( [seq_i for seq_i in range(len(self)) if seq_i not in to_skip]) return new_seq_ordering
[docs] def setReferenceSeq(self, seq): """ Set the specified sequence as the reference sequence. :param seq: Sequence to set as reference sequence :type seq: `sequence` """ self._assertCanSetReferenceSeq() new_seq_ordering = self._getReferenceSeqReordering(seq) self.reorderSequences(new_seq_ordering)
@msv_utils.const def _assertCanSetReferenceSeq(self): """ If any anchored residues are present, raise an exception stating that we can't set a new reference sequence. """ if self.getAnchoredResidues(): raise AnchoredResidueError("Can't set reference sequence while " "there are anchored residues")
[docs] @msv_utils.const def getReferenceSeq(self): """ Returns the sequence that has been set as reference sequence or None if there is no reference sequence. :return: The reference sequence or None :rtype: `Sequence` or None """ if not self._sequences: return None return self[0]
[docs] @msv_utils.const def isReferenceSeq(self, seq): """ Return whether or not a sequence is the reference sequence. :param seq: Sequence to check :type seq: `Sequence` :return: True if the sequence is the reference sequence, False otherwise. :rtype: bool """ return self._sequences and self._sequences[0] == seq
[docs] @msv_utils.const def getResidueIndices(self, residues, sort=True): """ Returns the indices (in the alignment) of the specified residues :param residues: The list of residues and gaps to get indices for. :type residues: list[residue.AbstractSequenceElement] :param sort: Whether the returned list should be sorted. :type sort: bool :rtype: A list of (sequence index, residue index) tuples :return: list[tuple(int, int)] """ indices = [] by_seqs = collections.defaultdict(list) for res in residues: seq = res.sequence if seq is None: raise ValueError( f"Residue {res} {id(res)} does not belong to a sequence") by_seqs[seq].append(res) for seq, residues_in_seq in by_seqs.items(): seq_i = self._seq_positions[seq] res_indices = seq.indices(residues_in_seq) indices.extend((seq_i, res_i) for res_i in res_indices) if sort: indices.sort() return indices
@msv_utils.const def _assertCanRemove(self, elements): """ Check whether the specified elements can be removed :param elements: An iterable of elements. :type elements: iterable(residue.AbstractSequenceElement) :raises AnchoredResidueError: if the elements cannot be removed :raises StructuredResidueError: In the event that this method attempts to remove structured residues """ elements = set(elements) anchored_res = set(self.getAnchoredResidues()) if elements.intersection(anchored_res): err_msg = ("Can't remove an anchored residue. Unanchor the residue " "and try again") raise AnchoredResidueError(err_msg) ref_seq = self.getReferenceSeq() anchored_ref_res = {ref_seq[res.idx_in_seq] for res in anchored_res} if elements.intersection(anchored_ref_res): err_msg = ("Can't remove a reference residue while another residue " "is anchored to it.") raise AnchoredResidueError(err_msg) if (not self._modifying_structure and residue.any_structured_residues(elements)): err_msg = ("Can't remove a structured residue.") raise StructuredResidueError(err_msg)
[docs] def removeElements(self, elements): """ Removes the specified elements from the alignment. :param elements: An iterable of elements. :type elements: iterable(residue.AbstractSequenceElement) :raises AnchoredResidueError: if the elements cannot be removed :raises StructuredResidueError: In the event that this method attempts to mutate structured residues """ elements = list(elements) # in case it's exhaustible self._assertCanRemove(elements) eles_to_remove = collections.defaultdict(list) for ele in elements: eles_to_remove[ele.sequence].append(ele) self.signals.residuesAboutToBeRemoved.emit(elements) for seq, eles in eles_to_remove.items(): gap_idxs = self._getAnchorConservingGapIdxs(seq, eles) seq.removeElements(eles) seq.addGapsByIndices(gap_idxs) # We wait until after all the changes to emit the signal # Note that the signals.residuesRemoved signal is emitted once even if # elements are removed from multiple sequences. self.signals.residuesRemoved.emit()
@msv_utils.const def _getAnchorConservingGapIdxs(self, seq, eles): """ Given a sequence and a list of elements to remove, return a list of indices to add gaps that ensure all residue anchors are maintained. :param seq: The sequence to remove elements from :type seq: sequence.Sequence :param eles: A list of elements that will be removed from `seq` :type eles: list(residue.Residue) :return: A sorted list of gap indices that will maintain the alignment of residue anchors. These indices should be added immediately after `eles` are removed from `seq` :rtype: list(int) """ if seq == self.getReferenceSeq(): seq_anchors = self.getAnchoredResidues() else: seq_anchors = self._anchored_residues.get(seq, []) seq_anchors = sorted(seq_anchors, key=lambda ele: ele.idx_in_seq) seq_anchor_idxs = [ele.idx_in_seq for ele in seq_anchors] # Count up the number of removed elements before each anchor. # We add a `None` to catch any removed elements that come after any # anchors. seq_anchors.append(None) counts = collections.Counter( seq_anchors[bisect(seq_anchor_idxs, ele.idx_in_seq)] for ele in eles) # Add gaps before any anchors equal to the number of elements # removed before them. gap_idxs = [] for anchor, count in counts.items(): if anchor is None: continue anchor_idx = anchor.idx_in_seq gap_idxs.extend(range(anchor_idx - count, anchor_idx)) return gap_idxs @msv_utils.const def _assertCanMutateResidues(self, seq_i, start, end, elements): seq = self[seq_i] removed = seq[start:end] self._assertCanRemove(removed) if len(elements) > len(removed): first_extra_idx = start + len(removed) self._assertCanInsert(seq, [first_extra_idx])
[docs] def mutateResidues(self, seq_i, start, end, elements): """ Mutate a sequence. :param seq_i: Index of seq to mutate :type seq_i: int :param start: Start index of seq region to mutate :type start: int :param end: End index of seq region to mutate :type end: int :param elements: Elements to mutate to :type elements: iterable(str) or iterable(ElementClass) :raises AnchoredResidueError: if the mutation violates anchoring :raises StructuredResidueError: In the event that this method attempts to mutate structured residues """ self._assertCanMutateResidues(seq_i, start, end, elements) seq = self._sequences[seq_i] length_preserving_end = start + len(elements) self.signals.residuesAboutToBeRemoved.emit(seq[start:end]) if end <= length_preserving_end: # When more residues are added than removed, we don't need to worry # about anchors seq.mutate(start, end, elements) self.signals.residuesRemoved.emit() else: # A same-length mutate is safe for anchors seq.mutate(start, length_preserving_end, elements) to_remove = seq[length_preserving_end:end] self.removeElements(to_remove)
[docs] def replaceResiduesWithGaps(self, residues): """ Replaces the specified residues with gaps :param residues: A list of residues to replace with gaps :type residues: list """ for seq_i, res_i in self.getResidueIndices(residues): self.mutateResidues(seq_i, res_i, res_i + 1, [residue.Gap()])
[docs] def addElements(self, seq, res_i, elements): """ Adds the specified elements (residues and/or gaps) to the alignment. :param seq: A sequence in the alignment :type seq: sequence.Sequence :param res_i: Index to insert the elements :type res_i: int :param elements: elements to insert :type elements: iterable(str or residue.AbstractSequenceElement) """ self._assertCanInsert(seq, [res_i]) elements = list(elements) # In case it's exhaustable seq.insertElements(res_i, elements) self.signals.residuesAdded.emit(elements)
[docs] @msv_utils.const def getResiduesWithStructure(self): """ Returns a list of all residues with structure """ def has_structure(res): return res.is_res and not res.seqres_only seqs = (seq for seq in self if seq.hasStructure()) return list(res for res in itertools.chain(*seqs) if has_structure(res))
[docs] @msv_utils.const @contextlib.contextmanager def modifyingStructure(self): self._modifying_structure = True yield self._modifying_structure = False
############################################################################ # Residue Anchoring ############################################################################
[docs] @contextlib.contextmanager def suspendAnchors(self): """ While inside this context, all anchors will be ignored. Upon exit, the anchors will be restored and an exception will be raised if any of the anchors are not aligned to the same reference residues they were aligned to at the start. """ self._suspendAnchors() yield self._unsuspendAnchors()
def _suspendAnchors(self): self._suspend_anchors_count += 1 if self._suspend_anchors_count > 1: # If this is a nested anchor suspension, return early return self._saved_anchored_residues = self._anchored_residues self._anchor_pairs = self._getAnchorPairs() self._anchored_residues = types.MappingProxyType({}) self._RO_anchored_residues = None self._RO_anchored_residues_with_ref = None # We use a read-only dict so that any attempted changes to anchoring # will throw an error. @msv_utils.const def _getAnchorPairs(self): """ Get all anchored residues paired with the corresponding reference seqence residue. :return: A list of (reference residue, anchored residue) tuples. :rtype: list(tuple(residue.Residue, residue.Residue)) """ ref_seq = self.getReferenceSeq() return [(ref_seq[res.idx_in_seq], res) for res in self.getAnchoredResidues()] def _unsuspendAnchors(self): self._suspend_anchors_count -= 1 if self._suspend_anchors_count > 0: # If we haven't exited as many times as we've entered, # return early return self._anchored_residues = self._saved_anchored_residues del self._saved_anchored_residues self._RO_anchored_residues = None self._RO_anchored_residues_with_ref = None for ref_res, res in self._anchor_pairs: if ref_res.idx_in_seq != res.idx_in_seq: # Discard anchors because at least one is invalid self._anchored_residues.clear() raise AnchoredResidueError( 'Residue anchoring broken during suspension' f': {ref_res.idx_in_seq}, {res.idx_in_seq}') del self._anchor_pairs @msv_utils.const def _assertCanAnchor(self, residues): ref_seq = self.getReferenceSeq() if any(res.idx_in_seq >= len(ref_seq) for res in residues): raise ValueError("Can't anchor residues that extend beyond the " "length of the reference sequence.") if any(ref_seq[res.idx_in_seq].is_gap for res in residues): raise ValueError("Can't anchor to a reference gap")
[docs] def anchorResidues(self, residues): """ Anchor the specified residues. If passed reference residues, all residues aligned to the reference residues will be anchored. Anchored residues are constrained to stay aligned to the reference residue with the same column index at the time of anchoring. If elements are removed from the alignment, gaps are added before anchors to maintain alignment. If any other modifications are made to the alignment that would break an anchor, an exception is raised. However, calling code can temporarily take responsibility for maintaining the anchors within the `suspendAnchors` context. :param residues: Residues to anchor. :type residues: list(residue.Residue) """ self._assertCanAnchor(residues) requested_residues = set(residues) ref_seq = self.getReferenceSeq() should_emit_signal = False residues_to_anchor = [] for res in requested_residues: if res in ref_seq: # If the reference residue is the only residue in the column # in `requested_residues`, add all of the column residues to # `residues_to_anchor`. ref_idx = res.idx_in_seq col_elems = self.getColumn(ref_idx, omit_gaps=False)[1:] if not any(elem in requested_residues for elem in col_elems): def is_gap(elem): is_terminal_gap = elem is None is_regular_gap = not is_terminal_gap and elem.is_gap return is_terminal_gap or is_regular_gap residues_to_anchor.extend( elem for elem in col_elems if not is_gap(elem)) elif res.is_res: residues_to_anchor.append(res) for res in residues_to_anchor: if res not in self._anchored_residues[res.sequence]: self._anchored_residues[res.sequence].add(res) should_emit_signal = True if should_emit_signal: self.signals.anchoredResiduesChanged.emit()
def _anchorResidueIndices(self, indices): """ Anchor the specified residues. :param indices: A list of (sequence index, residue index) tuples for all residues to anchor. :type indices: list[tuple(int, int)] """ anchor_residues = [ self[seq_idx][res_idx] for seq_idx, res_idx in indices ] self.anchorResidues(anchor_residues)
[docs] @msv_utils.const def getAnchoredResidues(self): """ :return: A frozenset of residues that are currently anchored. :rtype: frozenset(residue.Residue) """ # Return a read-only set of all anchored residues. if self._RO_anchored_residues is None: all_anchored_residues = itertools.chain( *self._anchored_residues.values()) self._RO_anchored_residues = frozenset(all_anchored_residues) return self._RO_anchored_residues
@msv_utils.const def _getAnchoredResidueIndices(self): """ Return [sequence index, residue index] for all anchored residues. :rtype: list[list(int, int)] """ residues = self.getAnchoredResidues() return list(map(list, self.getResidueIndices(residues)))
[docs] @msv_utils.const def getAnchoredResiduesWithRef(self): """ :return: A frozenset of residues that are currently anchored with the corresponding reference sequence residues :rtype: frozenset(residue.Residue) """ if len(self) == 0: return set() if self._RO_anchored_residues_with_ref is None: self._RO_anchored_residues_with_ref = \ frozenset(itertools.chain.from_iterable(self._getAnchorPairs())) return self._RO_anchored_residues_with_ref
[docs] def clearAnchors(self): self._clearAnchorsFromSeqs(0, len(self))
@QtCore.pyqtSlot(int, int) def _clearAnchorsFromSeqs(self, start_seq_idx, end_seq_idx): """ Clear all anchors from self[start_seq_idx:end_seq_idx+1] """ should_emit_signal = False for seq in self._sequences[start_seq_idx:end_seq_idx + 1]: if seq in self._anchored_residues: residues = self._anchored_residues.pop(seq) if residues: should_emit_signal = True if should_emit_signal: self.signals.anchoredResiduesChanged.emit()
[docs] def removeAnchors(self, residues): """ Unanchor residues. If passed reference residues, all residues anchored to those reference residues will be unanchored. Any given unanchored residues will be ignored. :param residues: The residues to unanchor. :type residues: iterable(residue.Residue) """ residues = list(residues) residues_to_unanchor = [] ref_seq = self.getReferenceSeq() for res in residues: if res in ref_seq: ref_idx = res.idx_in_seq col_elems = self.getColumn(ref_idx, omit_gaps=True) residues_to_unanchor.extend(col_elems[1:]) # omit ref res else: residues_to_unanchor.append(res) should_emit_signal = False if residues_to_unanchor: for res in residues_to_unanchor: try: self._anchored_residues[res.sequence].remove(res) should_emit_signal = True except KeyError: pass if should_emit_signal: self.signals.anchoredResiduesChanged.emit()
@msv_utils.const def _assertCanInsert(self, seq, insert_indices): """ Raise an exception if can't insert due to anchor :param seq: Sequence where insertion will occur :type seq: sequence.Sequence :param insert_indices: Indices of insertion (must be non-empty). :type insert_indices: iterable(int) """ if seq == self.getReferenceSeq(): seq_anchors = self.getAnchoredResidues() else: seq_anchors = self._anchored_residues.get(seq, []) if seq_anchors: first_idx = min(insert_indices) anchor_ok = all(res.idx_in_seq < first_idx for res in seq_anchors) if not anchor_ok: err_msg = ("Adding elements before an anchored residue will " "break the anchoring.") raise AnchoredResidueError(err_msg) ############################################################################ # Subalignments ############################################################################
[docs] @msv_utils.const def getSubalignment(self, start, end): """ Return another alignment containing the elements within the specified start and end indices :param start: The index at which the subalignment should start :type start: int :param end: The index at which the subalignment should end (exclusive) :type end: int :return: An alignment corresponding to the start and end points specified :rtype: BaseAligment """ AlignmentClass = self.__class__ seqs = [seq.getSubsequence(start, end) for seq in self._sequences] aln = AlignmentClass(seqs) return aln
[docs] @msv_utils.const def getDiscontinuousSubalignment(self, indices): """ Given a list of indices, return a new alignment of sequences made up of the residues at those specified indices within this alignment. :param indices: List of (seq index, residue index) tuples :type indices: list of (int, int) :return: A new subalignment :rtype: BaseAlignment """ indices.sort() res_range = [idx[1] for idx in indices] min_i = min(res_range) max_i = max(res_range) AlignmentClass = self.__class__ seqs = [] for seq_idx, group in itertools.groupby(indices, lambda idx: idx[0]): seq = self._sequences[seq_idx] SeqClass = seq.__class__ res_idxs = [idx[1] for idx in group] res_list = [] for ridx in range(min_i, max_i + 1): res = seq[ridx] if ridx in res_idxs else None res_list.append(res) seqs.append( SeqClass(res_list, name=seq.name, long_name=seq.long_name, chain=seq.chain)) aln = AlignmentClass(seqs) return aln
@msv_utils.const def _assertCanRemoveSubalignment(self, start, end): removed = [] for seq in self: removed.extend(seq[start:end]) self._assertCanRemove(removed)
[docs] def removeSubalignment(self, start, end): """ Remove a block of the subalignment from the start to end points. :param start: The start index of the columns to remove :type start: int :param end: The end index of the columns to remove (exclusive) :type end: int """ self._assertCanRemoveSubalignment(start, end) removed = [] for seq in self: removed.extend(seq[start:end]) with self.suspendAnchors(): self.removeElements(removed)
@property def is_rectangular(self): return all(len(seq) == self.num_columns for seq in self) def _assertRectangular(self, other_aln): """ :raises ValueError: if either this or the other alignment are not rectangular """ if not other_aln.is_rectangular: msg = "The other alignment must be rectangular" raise ValueError(msg) if not self.is_rectangular: msg = "This alignment must be rectangular to modify a subalignment" raise ValueError(msg)
[docs] def insertSubalignment(self, aln, start): """ Insert an alignment into the current alignment at the specified index :param aln: The alignment to insert :type aln: `BaseAlignment` :param start: The index at which to insert the alignment :type start: int :raises ValueError: if either alignment is not rectangular """ self._insertSubalignment(aln, start, require_rectangular=True)
@msv_utils.const def _insertSubalignment(self, aln, start, require_rectangular=True): if len(aln) != len(self): msg = ("Attempting to insert alignment with %d sequences into an " "alignment with %d sequences", (len(aln), len(self))) raise ValueError(msg) if require_rectangular: self._assertRectangular(aln) with self.suspendAnchors(): for seq, insert_seq in zip(self, aln): self.addElements(seq, start, insert_seq)
[docs] def replaceSubalignment(self, aln, start, end): """ Replace a subsection of the alignment indicated by start and end indices with the specified alignment :param aln: The alignment to insert :type aln: `BaseAlignment` :param start: The starting index of the subsection to replace. :type start: int :param end: The ending index of the subsection to replace. :type end: int :raises ValueError: if either alignment is not rectangular """ self._assertRectangular(aln) self.removeSubalignment(start, end) self.insertSubalignment(aln, start)
############################################################################ # Gaps ############################################################################
[docs] @msv_utils.const def getGaps(self): """ Returns a list of list of gaps. :return: list(list(residue.Gap)) :rtype: list """ return [seq.getGaps() for seq in self]
[docs] @msv_utils.const def getTerminalGaps(self): """ Returns the terminal gaps in all the sequences :rtype: list :return: list(list(residue.Gap)) """ return [seq.getTerminalGaps() for seq in self]
[docs] def removeAllGaps(self): """ Removes all the gaps of the sequences in the alignment. """ elems_to_remove = [elem for seq in self for elem in seq if elem.is_gap] if not elems_to_remove: return if self.getAnchoredResidues(): raise AnchoredResidueError( "Can't removeAllGaps while there is an " "anchored residue. Clear all anchors and try " "again.") self.signals.residuesAboutToBeRemoved.emit(elems_to_remove) for seq in self: seq.removeAllGaps() self.signals.residuesRemoved.emit()
[docs] def removeTerminalGaps(self): """ Removes the gaps from the ends of every sequence in the alignment """ for seq in self: seq.removeTerminalGaps()
@msv_utils.const def _validateGapBeforeIndices(self, gap_indices): """ Check whether inserting the gaps would break anchors. Intended for gap indices before the insertion, as passed to `addGapsBeforeIndices`. See `_validateGapIndices` for additional documentation. """ self._validateGapIndices( [sequence.offset_indices(indices) for indices in gap_indices]) @msv_utils.const def _validateGapIndices(self, gap_indices): """ Check whether inserting the gaps would break anchors. Intended for gap indices after the insertion, as passed to `addGapsByIndices`. :param gap_indices: A list of lists of gap indices :type gap_indices: list[list[int]] :raises ValueError: If `gap_indices` is the wrong length or contains indices that will be more than one column past the end of a sequence. :raises AnchoredResidueError: If any gap index is before an anchored col """ n_seqs = len(self) if len(gap_indices) != n_seqs: msg = ("The number of gap_indices passed to addGaps " f"({len(gap_indices)}) should match the number of sequences " f"in the alignment ({n_seqs})") raise ValueError(msg) for cur_gap_indices, seq in zip(gap_indices, self): seq.validateGapIndices(cur_gap_indices) seq_idxs_with_gaps = collections.defaultdict(list) for seq_idx, gaps in enumerate(gap_indices): for col_idx in gaps: seq_idxs_with_gaps[col_idx].append(seq_idx) broken_anchors = set() n_gaps_per_seq = [0] * n_seqs unbroken_seq_idxs = set(range(1, n_seqs)) for col_idx, seq_idxs in sorted(seq_idxs_with_gaps.items()): prev_n_gaps_per_seq = n_gaps_per_seq.copy() # Update running total of gaps added per seq for seq_idx in seq_idxs: n_gaps_per_seq[seq_idx] += 1 n_ref_gaps = n_gaps_per_seq[0] for seq_idx in sorted(unbroken_seq_idxs): n_nonref_gaps = n_gaps_per_seq[seq_idx] # To keep anchors aligned, equal numbers of gaps must be added # to the ref and non-ref seq. insertion_allowed = bool(n_nonref_gaps == n_ref_gaps) if insertion_allowed: continue # If the insertion is uneven, all downstream anchors for the # current non-ref seq will be broken. unbroken_seq_idxs.remove(seq_idx) # Account for non-anchor-breaking upstream gap insertions gap_offset = prev_n_gaps_per_seq[seq_idx] msg = ("Previous gap offsets must be equal, otherwise this " "seq was broken in a previous column and shouldn't have " "been checked again") assert gap_offset == prev_n_gaps_per_seq[0], msg offset_col_idx = col_idx - gap_offset downstream_anchors = self._getDownstreamAnchors( seq_idx, offset_col_idx) broken_anchors.update(downstream_anchors) if broken_anchors: err_msg = ("Adding elements before an anchored residue will " "break the anchoring.") raise AnchoredResidueError(err_msg, broken_anchors) @msv_utils.const def _getDownstreamAnchors(self, seq_idx, col_idx): """ Get anchors at or downstream of the given column. :param seq_idx: Index of sequence to get anchors for :type seq_idx: int :param col_idx: Index of column to get anchors for :type col_idx: int :return: Anchors downstream of `col_idx` in the sequence :rtype: list[residue.Residue] """ seq = self[seq_idx] anchored_res = self._anchored_residues.get(seq, set()) return [res for res in anchored_res if res.idx_in_seq >= col_idx]
[docs] def addGapsByIndices(self, gap_indices): """ Adds gaps to the alignment :note: the length of the gap_indices list must match the number of sequences in the alignment. :param gap_indices: A list of lists of gap indices, one for each sequence in the alignment. Note that these indices are based on residue/gap numbering *after* the insertion. To insert gaps using indices based on numbering before the insertion, see `addGapsBeforeIndices`. :type gap_indices: list[list[int]] :raises ValueError: if `gap_indices` is the wrong length :raises AnchoredResidueError: if any gap index is before an anchored col """ self._validateGapIndices(gap_indices) for seq, indices in zip(self._sequences, gap_indices): seq.addGapsByIndices(indices)
[docs] def addGapsBeforeIndices(self, gap_indices): """ Add one gap to the alignment before each of the specified residue positions. :note: the length of the gap_indices list must match the number of sequences in the alignment. :param gap_indices: A list of lists of indices to insert gaps before, one for each sequence in the alignment. Note that these indices are based on residue/gap numbering *before* the insertion. To insert gaps using indices based on numbering after the insertion, see `addGapsByIndices`. """ self._validateGapBeforeIndices(gap_indices) for seq, cur_indices in zip(self._sequences, gap_indices): seq.addGapsBeforeIndices(cur_indices)
[docs] def padAlignment(self): """ Insert gaps into an alignment so that it forms a rectangular block """ pad_gaps = [] length = self.num_columns for seq in self: seq_len = len(seq) gap_locations = list(range(seq_len, length)) pad_gaps.append(gap_locations) self.addGapsByIndices(pad_gaps)
[docs] @msv_utils.const def getGapOnlyColumns(self): """ For each sequence, return a list of the indices in that sequence for which the entire alignment contains gaps. (Indices will be omitted for a sequence if the sequence is shorter than the index.) :return: List of list of indices :rtype: list[list[int]] """ def all_gaps(col): # Empty column means all gaps return len(col) == 0 gap_indices = [ i for (i, col) in enumerate(self.columns(omit_gaps=True)) if all_gaps(col) ] truncated_indices = [] for seq in self: # filter out gap indices beyond the length of the seq seq_gaps = [i for i in gap_indices if i < len(seq)] truncated_indices.append(seq_gaps) return truncated_indices
[docs] def minimizeAlignment(self): """ Minimizes the alignment, i.e. removes all gaps from the gap-only columns. """ gap_only_columns = self.getGapOnlyColumns() for seq, indices, in zip(self, gap_only_columns): # Make sure that we're not requesting the removal of a gap that # doesn't exist. This can happen when minimizing an alignment # in which the sequences are different lengths. gaps = [seq[i] for i in indices if i < len(seq)] seq.removeElements(gaps)
[docs] @msv_utils.const def getAlignmentMinimizedWithSpaces(self): """ This method returns a new alignment and removes gap only columns however it leaves one gap column between blocks :returns: the new, minimized alignment :rtype: BaseAlignment """ new_aln = copy.deepcopy(self) gap_only_cols = new_aln.getGapOnlyColumns() # Process each sequence separately, because sequences may have # unequal number of residues for seq, gaps in zip(new_aln, gap_only_cols): if len(gaps): gap_groups = get_contiguous_groups(gaps) to_remove = [] # If the alignment has a leading gap-only column, remove it if gap_groups[0][0] == 0: to_remove.append(0) for group in gap_groups: to_remove.extend(group[1:]) if len(to_remove): seq.removeElements([seq[i] for i in to_remove]) return new_aln
############################################################################ # Columns ############################################################################
[docs] @msv_utils.const def getColumn(self, index, omit_gaps=False): """ Returns single alignment column at index position. Optionally, filters out gaps if omit_gaps is True. :param index: The index in the alignment :type index: int :param omit_gaps: Whether to omit the gaps :type omit_gaps: bool :return: Single alignment column at index position. Returns None to represent terminal gaps. :rtype: tuple(residue.Residue or residue.Gap or None) """ column = [] for seq in self: elem = None if index < len(seq): elem = seq[index] if not omit_gaps or (elem is not None and elem.is_res): column.append(elem) return tuple(column)
[docs] @msv_utils.const def columns(self, omit_gaps=False, *, match_type=False): """ A generator over all columns. :param omit_gaps: Whether to omit gaps :type omit_gaps: bool :param match_type: Whether to match reference sequence type :type match_type: bool """ if omit_gaps: def keep_func(elem): return elem is not None and elem.is_res else: def keep_func(elem): return elem is not None if match_type: seqs = list(self.getSeqsMatchingRefType()) # Make sure we yield a tuple for every column seqs.append(itertools.repeat(None, self.num_columns)) else: seqs = self for col in itertools.zip_longest(*seqs): yield tuple(elem for elem in col if keep_func(elem))
[docs] @msv_utils.const def seqMatchesRefType(self, seq): if self._ref_type_cache is None: # TODO MSV-1504 once nucleotide sequence doesn't inherit protein # sequence, use isinstance instead of type check self._ref_type_cache = type(self.getReferenceSeq()) return type(seq) == self._ref_type_cache
[docs] @msv_utils.const def getSeqsMatchingRefType(self): if self._seqs_matching_ref_type_cache is None: self._seqs_matching_ref_type_cache = tuple( seq for seq in self if self.seqMatchesRefType(seq)) return self._seqs_matching_ref_type_cache
def _updateColumnSameResCache(self): """ Update the cache of whether each column has all the same residue. False if any residue in the column is unknown or the column is mixed type (protein/nucleic acids). """ self._col_same_res_cache = [False] * self.num_columns for index, column in enumerate(self.columns(omit_gaps=True)): if self._getColumnHasSameRes(column): self._col_same_res_cache[index] = True def _getColumnHasSameRes(self, column): col_set = set() # To make sure we aren't mixing protein and nucleic acid residues seen_types = set() for res in column: if res.type in {residue.UNKNOWN, residue.UNKNOWN_NA}: return False seen_types.add(type(res.type)) if len(seen_types) > 1: return False if res.type.nonstandard: col_set.add(res.long_code) else: col_set.add(res.short_code) if len(col_set) > 1: return False return len(col_set) == 1 def _updateResidueSimilarityCache(self): """ Update the cache of residue similarities """ self._res_similarity_cache = {} ref_seq = self.getReferenceSeq() if ref_seq is None: return for seq in self: for ref_res, res in itertools.zip_longest(ref_seq, seq): if not res: continue elif not ref_res or ref_res.is_gap or res.is_gap: sim = ResidueSimilarity.NA elif res.getIdentityStrict(ref_res): sim = ResidueSimilarity.Identical elif res.getBinarySimilarity(ref_res): sim = ResidueSimilarity.Similar else: sim = ResidueSimilarity.Dissimilar self._res_similarity_cache[res] = sim
[docs] @msv_utils.const def columnHasAllSameResidues(self, index): """ Return whether or not the column at a specified index has all the same residues (excluding gaps). Note that if any unknown residues are present, the column will not be considered to be of all the same residue type. :param index: Index to check for uniformity :type index: int :return: True if the column is of uniform identity, False otherwise. :rtype: bool """ if self._col_same_res_cache is None: self._updateColumnSameResCache() return self._col_same_res_cache[index]
[docs] @msv_utils.const def getResidueSimilarity(self, res): """ Return the similarity score of a residue to the current reference residue at the residues position in the alignment. :param res: Residue to get the similarity score for :type res: `residue.Residue` :return: Similarity score for this residue :rtype: float or None """ if self._res_similarity_cache is None: self._updateResidueSimilarityCache() return self._res_similarity_cache[res]
[docs] @msv_utils.const def elementsToContiguousColumns(self, elements, invert=False, additional_breaks=None, last_col=None): """ Get elements marking contiguous columns containing any of the passed elements :param elements: Elements to convert to columns :type elements: iterable(AbstractSequenceElement) :param invert: Whether to invert logic (i.e. return columns not containing the passed elements) :type invert: bool :param additional_breaks: If given, contiguous columns will be broken at the specified indices. I.e., no contiguous set of columns will contain both column i and column i-1. :type additional_breaks: list[int] or None :param last_col: If given, the last column to consider when constructing contiguous columns. It not given, all columns will be considered. :type last_col: int or None :return: [start, end] elements of contiguous columns. Will be from the ref sequence unless the ref sequence is shorter than `num_columns` :rtype: iterable(tuple(AbstractSequenceElement, AbstractSequenceElement)) """ elements = set(elements) if additional_breaks is None: additional_breaks = set() else: additional_breaks = set(additional_breaks) start = None end = None columns = itertools.islice(self.columns(), last_col) for i, column_elems in enumerate(columns): if i in additional_breaks and start is not None: yield start, end start = None end = None if invert: match = all(ele not in elements for ele in column_elems) else: match = any(ele in elements for ele in column_elems) if match: column_residues = (ele for ele in column_elems if ele.is_res) cur_ele = next(column_residues, None) if cur_ele is None: raise RuntimeError("Call aln.minimizeAlignment first") if start is None: start = cur_ele end = cur_ele elif start is not None: yield start, end start = None end = None if start is not None: yield start, end
############################################################################ # Utilities ############################################################################
[docs] @QtCore.pyqtSlot() @msv_utils.const def clearAllCaching(self): self._resetCaches() self.annotations.clearAllCaching() for seq in self._sequences: seq.clearAllCaching()
@msv_utils.const def _getMoveSequenceAfterIndices(self, dest_seq, seqs_to_move): """ Figure out the rearrangements required to move sequences in the alignment. Note that the reference sequence cannot be moved by this method. If present in `seqs_to_move`, it will be ignored. :param dest_seq: The sequence to place `seqs_to_move` after. :type dest_seq: sequence.Sequence :param seqs_to_move: A list of all sequences to move. :type seqs_to_move: list[sequence.Sequence] :return: A list of new indices to accomplish the specified move. :rtype: list[int] """ if not seqs_to_move: raise ValueError("No sequences to move") dest_index = self.index(dest_seq) + 1 indices_to_move = sorted(map(self.index, seqs_to_move)) if indices_to_move[0] == 0: # don't allow the reference sequence to be dragged del indices_to_move[0] if not indices_to_move: # Raise an exception if the reference sequence was the *only* # sequence specified. (Otherwise, we just ignore it.) raise ValueError("Cannot move the reference sequence") dest_index -= sum(1 for index in indices_to_move if index < dest_index) new_indices = [i for i in range(len(self)) if i not in indices_to_move] new_indices[dest_index:dest_index] = indices_to_move return new_indices @msv_utils.const def _getAddSeqsToAlnSetOrdering(self, seqs_to_add, set_name): """ Figure out how to rearrange the alignment to move the specified sequences into the given alignment set. :param seqs_to_add: The sequences to add to the alignment set. Must not be empty and must not contains any sequences that are already in the alignment set being added to. :type seqs_to_add: Iterable[sequence.Sequence] :param set_name: The name of the alignment set to add to :type set_name: str :return: A list of new indices to accomplish the desired rearrangement. :rtype: list[int] """ seqs_to_add = set(seqs_to_add) ref_seq = self.getReferenceSeq() if set_name in self._name_to_set: aln_set = self._name_to_set[set_name] if ref_seq in seqs_to_add: # We're adding the reference sequence to the set, so move the # set to the top of the alignment new_indices = self._alnSetOrderingRefSeq( ref_seq, seqs_to_add, aln_set) else: last_seq_in_set = max(aln_set, key=self.index) new_indices = self._getMoveSequenceAfterIndices( last_seq_in_set, seqs_to_add) else: # this is a new set, so move all sequences to the location of the # first added sequence first_seq_to_add = min(seqs_to_add, key=self.index) prev_set = self._seq_to_set.get(first_seq_to_add) if prev_set is None: seqs_to_add -= {first_seq_to_add} if seqs_to_add: new_indices = self._getMoveSequenceAfterIndices( first_seq_to_add, seqs_to_add) else: # we don't need to do any rearranging new_indices = list(range(len(self))) elif first_seq_to_add is ref_seq: # we're changing the set of the reference sequence, so move all # of the sequences to add to the top of the alignment new_indices = self._alnSetOrderingRefSeq( ref_seq, seqs_to_add, []) else: # the first added sequence is already in a set, so we need to # move it to the end of the set it's already in new_set_indices = set(map(self.index, seqs_to_add)) last_index_from_prev_set = max(map(self.index, prev_set)) new_indices = list(i for i in range(last_index_from_prev_set + 1) if i not in new_set_indices) new_indices.extend(sorted(new_set_indices)) already_placed = set(new_indices) new_indices.extend( i for i in range(len(self)) if i not in already_placed) return new_indices def _alnSetOrderingRefSeq(self, ref_seq, seqs_to_add, aln_set): """ Figure out how to rearrange the alignment to move the specified sequences, which contain the reference sequence, into a new or existing alignment set. :param ref_seq: The reference sequence :type ref_seq: sequence.Sequence :param seqs_to_add: The sequences to add to the alignment set. Must contain the reference sequence and must not contains any sequences that are already in the alignment set being added to. :type seqs_to_add: Iterable[sequence.Sequence] :param aln_set: An iterable of sequences already in the alignment set, if any. :type aln_set: Iterable(sequence.Sequence) """ new_indices = [0] indices_in_set = sorted(map(self.index, aln_set)) new_indices.extend(indices_in_set) indices_to_add = sorted(map(self.index, seqs_to_add - {ref_seq})) new_indices.extend(indices_to_add) already_placed = set(new_indices) new_indices.extend( i for i in range(len(self)) if i not in already_placed) return new_indices
[docs] def addSeqsToAlnSet(self, seqs, set_name): """ Add all given sequences to the specified alignment set (i.e. a named group of sequences that are always kept together in the alignment). Sequences already in the set will be ignored. All other sequences will be moved to the end of the set. (Except for the reference sequence: The specified set will be moved to the top of the alignment if the reference sequence is added.) :param seqs: The sequences to add to the set. :type seqs: Iterable[sequence.Sequence] :param set_name: The name of the set to add the sequences to. If no set of this name exists, one will be created. :type set_name: str """ seqs = set(seqs) if set_name in self._name_to_set: seqs.difference_update(self._name_to_set[set_name]) if not seqs: return new_indices = self._getAddSeqsToAlnSetOrdering(seqs, set_name) self.reorderSequences(new_indices) self._addSeqsToAlnSetNoReordering(seqs, set_name)
def _addSeqsToAlnSetNoReordering(self, seqs, set_name): """ Add all given sequences to the specified alignment set (i.e. a named group of sequences that are always kept together in the alignment). Sequences will not be moved in the alingment. :param seqs: The sequences to add to the set. :type seqs: Iterable[sequence.Sequence] :param set_name: The name of the set to add the sequences to. If no set of this name exists, one will be created. :type set_name: str """ alignment_set = self._name_to_set.get(set_name) if alignment_set is None: alignment_set = AlignmentSet(set_name, self._set_id_counter) self._set_id_counter += 1 self._name_to_set[set_name] = alignment_set for cur_seq in seqs: old_set = self._seq_to_set.get(cur_seq) if old_set is alignment_set: # The sequence is already part of the requested set, so there's # nothing to do continue elif old_set is not None: self._removeSeqsFromAlnSetNoReordering([cur_seq]) alignment_set.add(cur_seq) self._seq_to_set[cur_seq] = alignment_set self.signals.alnSetChanged.emit()
[docs] def removeSeqsFromAlnSet(self, seqs): """ Remove all given sequences from any alignment sets they're part of. Sequences not in a set will be ignored. All other sequences will be moved to the end of the set that they were in. :param seqs: The sequences to remove from alignment sets. :type seqs: Iterable[sequence.Sequence] """ # ignore any sequences that aren't in a set seqs = set(seqs).intersection(self._seq_to_set.keys()) if not seqs: # nothing to remove return reordering = self._getRemoveFromAlnSetOrdering(seqs) self.reorderSequences(reordering) self._removeSeqsFromAlnSetNoReordering(seqs)
@msv_utils.const def _getRemoveFromAlnSetOrdering(self, seqs_to_remove): """ Figure out how to rearrange the alignment to move the specified sequences out of their current alignment set. :param seqs_to_remove: The sequences to remove from alignment sets. Must not contain any sequences that are not in an alignment set. :type seqs_to_remove: Iterable[sequence.Sequence] :return: A list of new indices to accomplish the desired rearrangement. :rtype: list[int] """ seqs_to_remove = set(seqs_to_remove) # never move the reference sequence ordering = [0] cur_to_remove = [] prev_set = None for i, cur_seq in enumerate(self[1:], start=1): cur_set = self.alnSetForSeq(cur_seq) if prev_set is not None and cur_set is not prev_set: # We've reached the end of a set, so put all sequences that we # removed from the set here. ordering.extend(cur_to_remove) cur_to_remove.clear() if cur_seq in seqs_to_remove: cur_to_remove.append(i) else: ordering.append(i) prev_set = cur_set ordering.extend(cur_to_remove) return ordering def _removeSeqsFromAlnSetNoReordering(self, seqs): """ Remove all given sequences from any alignment sets they're part of. Sequences will not be moved in the alignment. :param seqs: The sequences to remove from alignment sets. :type seqs: Iterable[sequence.Sequence] """ should_emit_signal = False for cur_seq in seqs: alignment_set = self._seq_to_set.get(cur_seq) if alignment_set is not None: should_emit_signal = True del self._seq_to_set[cur_seq] alignment_set.remove(cur_seq) if not alignment_set: # remove the set if it's empty del self._name_to_set[alignment_set.name] if should_emit_signal: self.signals.alnSetChanged.emit()
[docs] def renameAlnSet(self, old_name, new_name): """ Rename the specified alignment set. :param old_name: The old name of the alignment set. :type old_name: str :param new_name: The new name of the alignment set. :type new_name: str """ if new_name in self._name_to_set: raise ValueError(f"Set {new_name} already present.") try: alignment_set = self._name_to_set[old_name] except KeyError: raise ValueError(f"No set {old_name}") del self._name_to_set[old_name] alignment_set.name = new_name self._name_to_set[new_name] = alignment_set self.signals.alnSetChanged.emit()
[docs] @msv_utils.const def alnSetForSeq(self, seq): """ Return the alignment set that contains the given sequence. :param seq: The sequence to retrieve the alignment set for. :type seq: sequence.Sequence :return: The requested set. The calling scope must not modify the returned value. Will return None if `seq` is not part of any set. :rtype: AlignmentSet or None """ return self._seq_to_set.get(seq)
[docs] @msv_utils.const def hasAlnSets(self): """ Does this alignment contain any alignment sets? :rtype: bool """ return bool(self._seq_to_set)
[docs] @msv_utils.const def alnSetNames(self): """ Return all alignment set names. :rtype: set(str) """ return set(self._name_to_set.keys())
[docs] @msv_utils.const def alnSets(self): """ Iterate through all alignment sets. :return: An iterator through all alignment sets. The calling scope must not modify any of the sets. :rtype: dict_keys """ return self._name_to_set.values()
[docs] @msv_utils.const def getAlnSet(self, set_name): """ Return the requested set. :param set_name: The name of the set to retrieve. :type set_name: str :return: The requested set. The calling scope must not modify the returned value. :rtype: AlignmentSet :raise ValueError: If no set with the given name was found. """ try: return self._name_to_set[set_name] except KeyError: raise ValueError(f"No set {set_name}")
def _getAlnSetSeqIdxs(self, seq): """ :return: the indexes of all seqs in `seq`'s aln set, with the idx of `seq` first. :rtype: list[int] """ aln_set = self.alnSetForSeq(seq) seq_idx = self.index(seq) indexes = [seq_idx] if aln_set is not None: set_idxs = {self.index(s) for s in aln_set} set_idxs.remove(seq_idx) indexes.extend(sorted(set_idxs)) return indexes
[docs] def gatherAlnSets(self): if not self.hasAlnSets(): return new_seq_ordering = self._getGatherAlnSetsReordering() self.reorderSequences(new_seq_ordering)
@msv_utils.const def _getGatherAlnSetsReordering(self): new_seq_ordering = [] seen_idxs = set() for idx, seq in enumerate(self): if idx in seen_idxs: continue aln_set_seq_idxs = self._getAlnSetSeqIdxs(seq) new_seq_ordering.extend(aln_set_seq_idxs) seen_idxs.update(aln_set_seq_idxs) return new_seq_ordering
[docs] @msv_utils.const def getFrequencies(self, normalize=True): """ Returns the frequencies of each residue in each column. Residues are sorted by decreasing frequency. Gapped positions are not counted when calculating frequencies. :param normalize: Whether to normalize the values; i.e. divide by the number of non-gaps in the column :type normalize: bool :return: frequencies of each residue in each alignment column :rtype: tuple(tuple(residue.Residue, float or int))) """ for column in self.columns(omit_gaps=True, match_type=True): if not column: yield tuple() continue # Initialize a list of residue strings to pass to Counter and a # dict to associate the residue strings with residue objects. # Standard amino acids are represented using 1 letter code and # nonstandard amino acids are represented using 3 letter code. res_str_list = list() res_str_to_res_dict = dict() for res in column: add_to_dict = True if res.type.nonstandard: res_str = res.long_code else: res_str = res.short_code prev_res = res_str_to_res_dict.get(res_str) if (prev_res is not None and prev_res.type in residue.STD_AMINO_ACIDS): # Don't overwrite a standard amino acid add_to_dict = False res_str_list.append(res_str) if add_to_dict: res_str_to_res_dict[res_str] = res freq_dict = collections.Counter(res_str_list) # Create a new most_common list containing residue objects most_common_res = [] for res_str, freq in freq_dict.most_common(): res = res_str_to_res_dict[res_str] if normalize: freq = freq / len(column) most_common_res.append((res, freq)) yield tuple(most_common_res)
[docs] @msv_utils.const def getResidueSeqProps(self, value_types=None): """ Get a list of all sequence properties that any residue has. If 'value_types' is defined, get only the specific property types listed. :param value_types: list of specific properties types- str, int or float etc as structure.PROP_STRING,structure.PROP_INTEGER etc :type value_types: List :return: All the sequence properties :rtype: list[properties.SequenceProperty] """ seq_props = [] prop_names = set() desc_names = set() # Get all atom structure property names for st in self.all_structures: if len(st.atom): prop_names.update(st.getAtomPropertyNames()) # Get all residue descriptor names for seq in self: for res in seq: if res.is_gap: continue desc_names.update(res.getDescriptorKeys()) for prop_name in prop_names: if value_types is not None and prop_name[0] not in value_types: continue seq_props.append( properties.SequenceProperty( property_name=prop_name, property_source=properties.PropertySource.Residue, property_type=properties.PropertyType.StructureProperty, )) for desc_name in desc_names: seq_props.append( properties.SequenceProperty( property_name=desc_name, property_source=properties.PropertySource.Residue, property_type=properties.PropertyType.Descriptor, )) return seq_props
[docs] @msv_utils.const def getSeqsDescriptors(self): """ Return a list of all the calculated descriptors of the sequences in the alignment. :return: All the sequence descriptors :rtype: list[properties.SequenceProperty] """ seq_descriptors = [] names = set() for seq in self: for prop_source, prop_value in seq.descriptors.items(): for name, value in prop_value.items(): if name in names: continue seq_descriptors.append( properties.SequenceProperty( visible=False, property_name=name, property_source=prop_source, property_type=properties.PropertyType.Descriptor)) names.add(name) return seq_descriptors
@property def all_structures(self): """ Return an iterator over all sequence structures in the alignment. This does not repeat structures that belong to multiple sequences. """ entry_ids = set() for seq in self: # Dont return the same structure twice eid = seq.entry_id if seq.entry_id in entry_ids: continue entry_ids.add(eid) st = seq.getStructure() if st is None: continue yield st
[docs]class AlignmentSet(set): """ A named group of sequences that are always kept together in the alignment. """
[docs] def __init__(self, name, set_id): """ :param name: The name of the alignment set. :type name: str :param set_id: A unique integer ID for the alignment set. Used to determine the color of the icon and text. :type set_id: int """ super().__init__() self.name = name self.set_id = set_id
class _ProteinAlignment(BaseAlignment): """ A base class for split-chain and combined-chain protein alignments. """ _ALN_ANNOTATION_CLASS = annotation.ProteinAlignmentAnnotations _SEQ_ANNOTATION_CLASS = annotation.ProteinSequenceAnnotations @property def disulfide_bonds(self): return tuple(set().union(*(set(seq.disulfide_bonds) for seq in self))) @property def pred_disulfide_bonds(self): return tuple( set().union(*(set(seq.pred_disulfide_bonds) for seq in self))) @msv_utils.const def _getInvalidatedBonds(self, seqs): """ Return any inter-sequence bonds between a sequence in `seqs` and a sequence not in `seqs`. In other words, remove any inter-sequence bonds that will no longer be valid after we remove `seqs` from the alignment. :param seqs: The sequences to remove inter-sequence bonds from :type seqs: set[sequence.Sequence] :return: A tuple of: - invalidated known disulfide bonds - invalidated predicted disulfide bonds :rtype: tuple(list[residue.DisulfideBond], list[residue.DisulfideBond]) """ raise NotImplementedError def _restoreInvalidatedBonds(self, invalidated_known_bonds, invalidated_pred_bonds): """ Restore the invalidated bonds returned by `_getInvalidatedBonds` :param invalidated_known_bonds: Known disulfide bonds to restore :type invalidated_known_bonds: list[residue.DisulfideBond] :param invalidated_pred_bonds: Predicted disulfide bonds to restore :type invalidated_pred_bonds: list[residue.DisulfideBond] """ raise NotImplementedError
[docs]class ProteinAlignment(json.JsonableClassMixin, _ProteinAlignment):
[docs] @msv_utils.const def toJsonImplementation(self): anchor_res_indices = self._getAnchoredResidueIndices() interseq_bonds = [ bond for bond in self.disulfide_bonds if bond.is_inter_sequence ] bond_idxs = [[[self.index(res1.sequence), res1.idx_in_seq], [self.index(res2.sequence), res2.idx_in_seq]] for res1, res2 in interseq_bonds] aln_sets = [] for aln_set in self._name_to_set.values(): seq_indices = [self.index(seq) for seq in aln_set] aln_sets.append([aln_set.name, aln_set.set_id, seq_indices]) return { 'sequences': self._sequences, 'anchored_residues': anchor_res_indices, 'inter_sequence_bond_idxs': bond_idxs, 'aln_sets': aln_sets, }
[docs] @classmethod def fromJsonImplementation(cls, json_obj): seqs = [ sequence.ProteinSequence.fromJson(json_seq) for json_seq in json_obj['sequences'] ] aln = cls(seqs) aln._anchorResidueIndices(json_obj['anchored_residues']) for bond in json_obj['inter_sequence_bond_idxs']: seq_idx1, res_idx1 = bond[0] seq_idx2, res_idx2 = bond[1] aln.addDisulfideBond(aln[seq_idx1][res_idx1], aln[seq_idx2][res_idx2]) for set_name, set_id, seq_idxs in json_obj['aln_sets']: seqs = [aln[idx] for idx in seq_idxs] aln._addSeqsToAlnSetNoReordering(seqs, set_name) aln._name_to_set[set_name].set_id = set_id return aln
[docs] @json.adapter(version=48002) def adapter48002(cls, json_dict): json_dict['aln_sets'] = [] return json_dict
[docs] def addDisulfideBond(self, res1, res2, known=True): """ Add a disulfide bond if both residues' sequences are in the alignment :param res1: A residue to link with a disulfide bond :type res1: residue.Residue :param res2: Another residue to link with a disulfide bond :type res2: residue.Residue :param known: Whether the bond is known or predicted :type known: bool :raise ValueError: if either sequence is not in the alignment """ seqs = {r.sequence for r in (res1, res2)} if any(seq not in self for seq in seqs): raise ValueError(("Cannot add a disulfide bond to residues from " "sequence(s) that are not in the alignment")) residue.add_disulfide_bond(res1, res2, known=known)
[docs] def removeDisulfideBond(self, bond): """ Disconnect a disulfide bond. The bond may be either known or predicted. :param bond: The bond to disconnect :type bond: residue.DisulfideBond :raise ValueError: if either sequence is not in the alignment """ seqs = {r.sequence for r in bond} if any(seq not in self for seq in seqs): raise ValueError( ("Cannot remove a disulfide bond from residues from " "sequence(s) that are not in the alignment")) residue.remove_disulfide_bond(bond)
def __deepcopy__(self, memo): aln = super().__deepcopy__(memo) self._copyInterchainDisulfidesTo(aln) return aln @msv_utils.const def _copyInterchainDisulfidesTo(self, aln): """ Copy all interchain disulfide bonds (known and predicted) from this alignment to the given alignment. :param aln: The alignment to copy disulfides to. :type aln: ProteinAlignment """ mapped_bonds = [ self._mapResidues(bond, aln) for bond in self.disulfide_bonds if bond.is_inter_sequence ] for res1, res2 in mapped_bonds: aln.addDisulfideBond(res1, res2) mapped_pred_bonds = [ self._mapResidues(bond, aln) for bond in self.pred_disulfide_bonds if bond.is_inter_sequence ] for res1, res2 in mapped_pred_bonds: aln.addDisulfideBond(res1, res2, known=False) def _additionalSeqRemovalSteps(self, seqs): # remove invalidated disulfide bonds invalidated_known_bonds, invalidated_pred_bonds = \ self._getInvalidatedBonds(seqs) for bond in invalidated_known_bonds + invalidated_pred_bonds: self.removeDisulfideBond(bond) @msv_utils.const def _getInvalidatedBonds(self, seqs): # See parent class for method documentation invalidated_known_bonds = self._getInvalidatedBondsFromBondList( seqs, self.disulfide_bonds) invalidated_pred_bonds = self._getInvalidatedBondsFromBondList( seqs, self.pred_disulfide_bonds) return invalidated_known_bonds, invalidated_pred_bonds def _getInvalidatedBondsFromBondList(self, seqs, bonds_to_check): invalidated_bonds = [] for bond in bonds_to_check: res1, res2 = bond.res_pair if bool(res1.sequence in seqs) is not bool(res2.sequence in seqs): invalidated_bonds.append(bond) return invalidated_bonds def _restoreInvalidatedBonds(self, invalidated_known_bonds, invalidated_pred_bonds): # See parent class for method documentation for res1, res2 in invalidated_known_bonds: self.addDisulfideBond(res1, res2) for res1, res2 in invalidated_pred_bonds: self.addDisulfideBond(res1, res2, known=False) ############################################################################ # IO Convenience Methods ############################################################################
[docs] @classmethod def fromStructure(cls, ct, eid=None): """ :param ct: The structure to convert :type ct: schrodinger.structure.Structure :param eid: The entry id to assign to the created sequences. If not given, the entry id from the structure, if any, will be used. :type eid: str :rtype: cls :return: An alignment containing the sequences in the structure """ from schrodinger.application.msv import seqio converter = seqio.StructureConverter(ct, eid) sequences = converter.makeSequences() return cls(sequences)
[docs] @classmethod def fromClustalFile(cls, file_name): """ Returns alignment read from file in Clustal .aln format preserving order of sequences. :type file_name: str :param file_name: Source file name. :raises IOError: If output file cannot be read. :return: An alignment :note: The alignment can be empty if no sequence was present in the input file. """ from schrodinger.application.msv import seqio return seqio.ClustalAlignmentReader.read(file_name, cls)
[docs] @msv_utils.const def toClustalFile(self, file_name, use_unique_names=True): """ Writes aln to a Clustal alignment file. :raises IOError: If output file cannot be written. :type file_name: str :param file_name: Destination file name. :param use_unique_names: If True, write unique name for each sequence. :type use_unique_names: bool """ from schrodinger.application.msv import seqio return seqio.ClustalAlignmentWriter.write(self, file_name, use_unique_names)
[docs] @classmethod def fromFastaFile(cls, file_name): """ Returns alignment read from file in Clustal .aln format preserving order of sequences. :raises IOError: If the input file cannot be read. :param file_name: name of input FASTA file :type file_name: str :return: Read alignment. The alignment can be empty if no sequence was present in the input file. :rtype: ProteinAlignment """ from schrodinger.application.msv import seqio return seqio.FastaAlignmentReader.read(file_name, cls)
[docs] @classmethod def fromFastaString(cls, lines): """ Read sequences from FASTA-formatted text, creates sequences and appends them to alignment. Splits sequence name from the FASTA header. :param lines: list of strings representing FASTA file :type lines: list of str :return: The alignment :rtype: ProteinAlignment """ from schrodinger.application.msv import seqio return seqio.FastaAlignmentReader.readFromText(lines, cls)
[docs] @classmethod def fromFastaStringList(cls, strings): """ Return an alignment object created from an iterable of sequence strings :param strings: Sequences as iterable of strings (1D codes) :type strings: Iterable of strings :return: The alignment :rtype: ProteinAlignment """ from schrodinger.application.msv import seqio return seqio.FastaAlignmentReader.readFromStringList(strings, cls)
[docs] @msv_utils.const def toFastaString(self, use_unique_names=True, maxl=50): """ Convert ProteinAlignment object to list of sequence strings :param aln: Alignment data :type aln: `ProteinAlignment` """ from schrodinger.application.msv import seqio return seqio.FastaAlignmentWriter.toString(self, use_unique_names, maxl)
[docs] @msv_utils.const def toFastaStringList(self): """ Convert self to list of fasta sequence strings :rtype: list :return: list of str """ from schrodinger.application.msv import seqio return seqio.FastaAlignmentWriter.toStringList(self)
[docs] @msv_utils.const def toFastaFile(self, file_name, use_unique_names=True, maxl=50): """ Write self to specified FASTA file :raises IOError: If output file cannot be written. """ from schrodinger.application.msv import seqio seqio.FastaAlignmentWriter.write(self, file_name, use_unique_names, maxl)
############################################################################ # Utilities ############################################################################
[docs] @msv_utils.const def findPattern(self, pattern): """ Finds a specified PROSITE pattern in all sequences. :param pattern: PROSITE pattern to search in sequences. See `protein.sequence.find_generalized_pattern` for documentation. :type pattern: str :returns: List of matching residues :rtype: list of `protein.residue.Residue` """ # Encode sequences for pattern search seq_dicts = [ seq.encodeForPatternSearch(with_ss=True, with_flex=True, with_asa=True) for seq in self ] # Perform pattern search # Insert dashes into uppercase letters (e.g. "CMY" -> "C-M-Y") if pattern.isalpha() and pattern == pattern.upper(): pattern = "-".join(pattern) pattern_matches = sequence.find_generalized_pattern( seq_dicts, str(pattern)) if not pattern_matches: return [] # Select specified residues selected_residues = [] for seq_index, match_list in enumerate(pattern_matches): for start, end in match_list: matched_seq = self[seq_index] res_indices = [ i for (i, res) in enumerate(matched_seq) if res.is_res ] # select residues independent of gaps for res_index in res_indices[start:end]: selected_residues.append(matched_seq[res_index]) return selected_residues
[docs]class NucleicAcidAlignment(BaseAlignment): # TODO (MSV-1504): Create proper nucleic acid domain objects pass
class _SeqsFromTheSameProtein: """ Information about sequences from the same protein. Used in `CombinedChainProteinAlignment._combineSequences`. :ivar seqs: A list of list of sequences. No list of sequences contains multiple sequences with the same chain name. :vartype seqs: list(list(sequence.Sequence)) :ivar counts: A dictionary of {chain name: number of sequences with that chain name} :vartype: Counter """ def __init__(self): self.seqs = [] self.counts = collections.Counter()
[docs]class CombinedChainProteinAlignment(_ProteinAlignment): """ An alignment containing combined-chain sequences (`sequence.CombinedChainProteinSequence` objects). """
[docs] def __init__(self, sequences=None, *, chains_to_combine=None): """ :param sequences: A list of split-chain or combined-chain sequences to add to the alignment. If not given, an empty alignment will be created. :type sequences: list[sequence.ProteinSequence] or list[sequence.CombinedChainProteinSequence] :param chains_to_combine: Information about which split-chain sequences in `split_undoable_aln` should be included in which combined-chain sequence. Should be a list of lists of indices. Each index refers to the split-chain sequence at that position of `split_undoable_aln`, and split-chain sequences that are listed together will be combined into the same combined-chain sequence. Each split-chain sequence from `split_undoable_aln` must be referenced exactly once. :type chains_to_combine: list[list[int]] """ self._split_seq_cache = None if chains_to_combine is None: super().__init__(sequences) else: super().__init__() self._combineAndAddSplitSeqs(sequences, chains_to_combine)
[docs] def addSeqs(self, seqs, start=None): """ Add multiple sequences to the alignment. Note that either single-chain sequences or combined-chain sequences may be added (but not both at the same time). :param sequences: Sequences to add :type sequences: list[sequence.ProteinSequence] or list[sequence.CombinedChainProteinSequence] :param start: The index at which to insert; if None, seqs are appended. Must be None if adding single-chain sequences. :type start: int """ if not seqs: return [] elif self._isCombinedChainSeq(seqs[0]): super().addSeqs(seqs, start) self._split_seq_cache = None return seqs elif start is not None: raise RuntimeError("Cannot specify start when adding split-chain " "sequences to a combined-chain alignment.") else: combined_seqs = self._addSplitSeqs(seqs) self._split_seq_cache = None return combined_seqs
@msv_utils.const def _isCombinedChainSeq(self, seq): """ Figure out if the given sequence is a single-chain or a combined-chain sequence. :param seq: The sequence to check. :type seq: sequence.ProteinSequence or sequence.CombinedChainProteinSequence :return: True if the sequence is combined-chain. False if it is single-chain. :rtype: bool """ return isinstance(seq, sequence.CombinedChainProteinSequence) def _addSplitSeqs(self, split_seqs): """ Add single-chain sequences to the alignment. If any of the sequences to add represent new chains of an existing combined-chain sequence, they will be added to the existing sequence. :param split_seqs: The sequences to add :type split_seqs: list[sequence.ProteinSequence] :return: A list of all combined-chain sequences (both new and modified) that contain the added sequences. :rtype: list[sequence.CombinedChainProteinSequence] """ modified_combined_seqs, unmatched_split_seqs = \ self._addNewChainsToExistingSeqs(split_seqs) if unmatched_split_seqs: new_combined_seqs = self._combineSequences(unmatched_split_seqs) super().addSeqs(new_combined_seqs) return modified_combined_seqs + new_combined_seqs else: return modified_combined_seqs def _addNewChainsToExistingSeqs(self, split_seqs): """ Add new single-chain sequences to existing combined-chain sequences. :param split_seqs: The sequences to add :type split_seqs: list[sequence.ProteinSequence] :return: A tuple of: - The combined-chain sequences that had new chains added - The single-chain sequence that were not added to any existing combined-chain sequences. :rtype: tuple(list[sequence.CombinedChainProteinSequence], list[sequence.ProteinSequence]) """ if len(self) == 0: return [], split_seqs existing_seqs = {} for seq in self: key = (seq.entry_id, seq.name) existing_seqs.setdefault(key, []).append(seq) modified_combined_seqs = set() unmatched_split_seqs = [] for seq in split_seqs: matching_prots = existing_seqs.get((seq.entry_id, seq.name)) if not matching_prots: unmatched_split_seqs.append(seq) continue for cur_existing_seq in matching_prots: if not cur_existing_seq.hasChain(seq.chain): modified_combined_seqs.add(cur_existing_seq) cur_existing_seq.addChain(seq) break else: unmatched_split_seqs.append(seq) return list(modified_combined_seqs), unmatched_split_seqs def _combineSequences(self, aln): """ Create combined sequences for all of the given split-chain sequences. :param aln: The split-chain sequences to combine :type aln: Iterable(sequence.ProteinSequence) :return: The combined sequences. Note that this list preserves ordering from `aln`. :rtype: list(sequence.CombinedChainProteinSequence) """ grouped_seqs = [] seq_data = collections.defaultdict(_SeqsFromTheSameProtein) for seq in aln: cur_seq_data = seq_data[seq.entry_id, seq.name] seq_num = cur_seq_data.counts[seq.chain] cur_seq_data.counts[seq.chain] += 1 if seq_num == len(cur_seq_data.seqs): cur_grouped_seqs = [] cur_seq_data.seqs.append(cur_grouped_seqs) grouped_seqs.append(cur_grouped_seqs) else: cur_grouped_seqs = cur_seq_data.seqs[seq_num] cur_grouped_seqs.append(seq) return [ sequence.CombinedChainProteinSequence(seqs) for seqs in grouped_seqs ] def _combineAndAddSplitSeqs(self, chains, indices_to_combine): """ Create new combined-chain sequences from the given split-chain sequences and add the new combined-chain sequences to the alignmment. :param chains: The split-chain sequences to combine and add to the aligment. :type chains: list[sequence.ProteinSequence] :param indices_to_combine: Information about which chains should be included in which combined-chain sequence. Should be a list of lists of indices. Each index refers to the chain at that position of `chains`, and chains that are listed together will be combined into the same combined-chain sequence. :type indices_to_combine: list[list[int]] """ combined_seqs = [] for cur_indices in indices_to_combine: cur_chains = [chains[i] for i in cur_indices] # make sure that we're only grouping together chains that are from # the same protein for chain_to_check in cur_chains[1:]: assert chain_to_check.name == cur_chains[0].name assert chain_to_check.entry_id == cur_chains[0].entry_id seq = sequence.CombinedChainProteinSequence(cur_chains) combined_seqs.append(seq) self.addSeqs(combined_seqs)
[docs] def removeSeqs(self, seqs): """ Remove multiple sequences from the alignment. Note that either single- chain sequences or combined-chain sequences may be added (but not both at the same time). :param sequences: Sequences to remove :type sequences: Iterable[sequence.Sequence] or Iterable[sequence.CombinedChainProteinSequence] """ seqs = list(seqs) if not seqs: return elif self._isCombinedChainSeq(seqs[0]): super().removeSeqs(seqs) else: self._removeSplitSeqs(seqs) self._split_seq_cache = None
def _removeSplitSeqs(self, split_seqs): """ Remove single-chain sequences from the alignment. If all chains of a combined-chain sequence are removed, the combined-chain sequence will be removed from the alignment. :param split_seqs: The sequences to remove :type split_seqs: list[sequence.ProteinSequence] """ split_seqs = set(split_seqs) combined_seqs_to_remove = [] chains_to_remove = [] # Figure out what chains go with which combined-chain sequence for combined_seq in self: cur_chains = split_seqs.intersection(combined_seq.chains) if not cur_chains: continue if len(cur_chains) == len(combined_seq.chains): combined_seqs_to_remove.append(combined_seq) else: chains_to_remove.append((combined_seq, cur_chains)) split_seqs -= cur_chains if split_seqs: raise ValueError("Cannot remove chains that are not in the " "alignment") # actually remove the chains and sequences super().removeSeqs(combined_seqs_to_remove) for combined_seq, chains in chains_to_remove: combined_seq.removeChains(chains)
[docs] @msv_utils.const def combinedSeqForSplitSeq(self, split_seq): """ Get the combined-chain sequence that contains the given split-chain sequence. :param split_seq: The split-chain sequence :type split_seq: sequence.Sequence :return: The combined-chain sequence :rtype: sequence.CombinedChainProteinSequence """ if self._split_seq_cache is None: self._split_seq_cache = { cur_split_seq: combined_seq for combined_seq in self for cur_split_seq in combined_seq.chains } return self._split_seq_cache[split_seq]
[docs] @msv_utils.const def combinedResForSplitRes(self, split_res): """ Get the combined-chain residue for the given split-chain residue. :param res: The split-chain residue :type res: residue.AbstractSequenceElement :return: The combined-chain residue :rtype: residue.CombinedChainResidueWrapper """ combined_seq = self.combinedSeqForSplitSeq(split_res.sequence) return residue.CombinedChainResidueWrapper(split_res, combined_seq)
@msv_utils.const def _assertCanMutateResidues(self, seq_i, start, end, elements): super()._assertCanMutateResidues(seq_i, start, end, elements) # Make sure that the specified indices don't span a chain break self[seq_i].assertCanMutateResidues(start, end)
[docs] @msv_utils.const def getInterChainAnchors(self): """ Return all residues that are anchored to a different chain of the reference sequence (e.g. a residue in the second chain anchored to a reference residue from the first chain). :return: The anchored residues. :rtype: set[residue.Residue] """ inter_chain_anchors = set() ref_seq = self.getReferenceSeq() for ref_res, anchored_res in self._getAnchorPairs(): ref_chain_index = ref_seq.chains.index(ref_res.split_sequence) anc_seq_chains = anchored_res.sequence.chains anc_chain_index = anc_seq_chains.index(anchored_res.split_sequence) if ref_chain_index != anc_chain_index: inter_chain_anchors.add(anchored_res) return inter_chain_anchors
[docs] def alignChainStarts(self): """ Align chain starting positions (e.g. make sure that the start of the N-th chain occurs in the same column for all sequences). This method will add gaps at the starts and/or ends of chains to preserve anchoring. :return: A tuple of: - A list of chain starting indices. This will not include the starting index of the first chain, which is always 0. - The starting index of the first chain for which there's no corresponding reference chain (e.g. the starting index for the third chain if there are only two chains in the reference sequence). This will be None if there are no chains without a corresponding reference chain. :rtype: tuple(list[int], int or None) """ gaps, chain_starts, end_of_ref = self._gapsToAddToAlignChainStarts() self._addGapsToChainStartsAndEnds(gaps) return chain_starts, end_of_ref
@msv_utils.const def _gapsToAddToAlignChainStarts(self): """ Figure out how many gaps need to be added to each chain start and end in order to align the chain starting positions (e.g. make sure that the start of the N-th chain occurs in the same column for all sequences). :return: A tuple of: - The numbers of gaps to add - A list of chain starting indices. This will not include the starting index of the first chain, which is always 0. - The starting index of the first chain for which there's no corresponding reference chain (e.g. the starting index for the third chain if there are only two chains in the reference sequence). This will be None if there are no chains without a corresponding reference chain. :rtype: tuple(GapRegionsForSeqs, list[int], int or None) """ if len(self) == 0: return [], [], None anchor_offsets = self._getAnchorOffsets() gaps_to_add, chain_lengths = self._gapsToAddFromAnchorOffsets( anchor_offsets) chain_starts = list(itertools.accumulate(chain_lengths))[:-1] num_ref_chains = len(self.getReferenceSeq().chains) if num_ref_chains - 1 == len(chain_starts): # There are as many chains in the reference sequence as there are in # the whole alignment end_of_ref = None else: end_of_ref = chain_starts[num_ref_chains - 1] return gaps_to_add, chain_starts, end_of_ref def _getAnchorOffsets(self): """ Determine how many residues we would need to offset the start of each chain relative to the equivalent chain in the reference sequence in order to preserve anchored residues. :return: Offset values given as `anchor_offsets[chain_index][sequence_index] = offset`. Note that the chain index comes first, not the sequence index. If a given chain doesn't contain any anchored residues, the offset will be zero. :rtype: list[list[int]] :raise AnchoredResidueError: If any inter-chain anchors are present. See `getInterChainAnchors` for additional information. """ num_chains = max(len(seq.chains) for seq in self) anchor_offsets = [[0] * len(self) for _ in range(num_chains)] ref_seq = self.getReferenceSeq() for ref_res, anchored_res in self._getAnchorPairs(): ref_chain_index = ref_seq.chains.index(ref_res.split_sequence) ref_res_index = ref_res.split_res.idx_in_seq anc_seq = anchored_res.sequence anc_seq_index = self.index(anc_seq) anc_chain_index = anc_seq.chains.index(anchored_res.split_sequence) anc_res_index = anchored_res.split_res.idx_in_seq if ref_chain_index != anc_chain_index: raise AnchoredResidueError() cur_offset = ref_res_index - anc_res_index anchor_offsets[anc_chain_index][anc_seq_index] = cur_offset return anchor_offsets def _gapsToAddFromAnchorOffsets(self, anchor_offsets): """ Using the anchor offsets returned by `_getAnchorOffsets`, figure out how many gaps need to be added to each chain start and end in order to align the chain starting positions while preserving anchored residues. :param anchor_offsets: How many residues we need to offset the start of each chain relative to the equivalent chain in the reference sequence in order to preserve anchored residues. :type anchor_offsets: list[list[int]] :return: A tuple of: - The number of gaps to add to the start and end of each chain - The length that each chain will be after the gaps have been added. :rtype: tuple(GapRegionsForSeqs, list[int]) """ gaps_to_add = [[None] * len(seq.chains) for seq in self] chain_lengths = [] for chain_i, (chain_anchor_offsets, chains) in enumerate( zip(anchor_offsets, itertools.zip_longest(*(seq.chains for seq in self)))): # figure out how many gaps we're going to have to add before the # start of this chain in the reference sequence ref_start_gaps = -min( (offset for offset in chain_anchor_offsets if offset < 0), default=0) # figure out how many columns this chain is going to take up max_chain_length = max(ref_start_gaps + cur_anchor_offset + len(cur_chain) for cur_chain, cur_anchor_offset in zip( chains, chain_anchor_offsets) if cur_chain is not None) chain_lengths.append(max_chain_length) for seq_i, (cur_seq, cur_chain, cur_anchor_offset) in enumerate( zip(self, chains, chain_anchor_offsets)): if cur_chain is None: # this sequence doesn't have a chain in this position continue start_gaps = ref_start_gaps + cur_anchor_offset if chain_i < len(cur_seq.chains) - 1: end_gaps = max_chain_length - len(cur_chain) - start_gaps else: # there's no need to add gaps to the end of the sequence end_gaps = 0 gaps_to_add[seq_i][chain_i] = sequence.GapRegion( from_start=start_gaps, from_end=end_gaps) return gaps_to_add, chain_lengths def _addGapsToChainStartsAndEnds(self, gaps: sequence.GapRegionsForSeqs): """ Add the specified numbers of gaps to the starts and ends of each chain in each sequence. :param gaps: The numbers of gaps to add """ assert len(gaps) == len(self) with self.suspendAnchors(): for seq, cur_gaps in zip(self, gaps): seq.addGapsToChainStartsAndEnds(cur_gaps) def _removeGapsFromChainStartsAndEnds(self, gaps: sequence.GapRegionsForSeqs): """ Remove the specified numbers of gaps from the starts and ends of each chain in each sequence. :param gaps: The numbers of gaps to remove """ assert len(gaps) == len(self) with self.suspendAnchors(): for seq, cur_gaps in zip(self, gaps): seq.removeGapsFromChainStartsAndEnds(cur_gaps)
[docs] def adjustChainStarts(self, adjust_by): """ Move each chain break position to the right by the specified number of gaps. Note that chain breaks can only be moved along gaps, not residues. :param num_gaps: The number of gaps to move each chain break by, given as `adjust_by[sequence_index][chain_break_index] = adjustment`. Note that no adjustment is given for the start of the first chain or the end of the last chain. :type num_gaps: list[list[int]] :raise AssertionError: If some of the sequence elements to be removed aren't actually gaps. """ gaps_to_remove, gaps_to_add = self._adjustChainStartsToGaps(adjust_by) with self.suspendAnchors(): self._removeGapsFromChainStartsAndEnds(gaps_to_remove) self._addGapsToChainStartsAndEnds(gaps_to_add)
@msv_utils.const def _adjustChainStartsToGaps(self, adjust_by): """ Convert the value passed to `adjustChainStarts` into values that can be passed to `_removeGapsFromChainStartsAndEnds` and `_addGapsToChainStartsAndEnds`. :param num_gaps: The number of gaps to move each chain break by, given as `adjust_by[sequence_index][chain_break_index] = adjustment`. Note that no adjustment is given for the start of the first chain or the end of the last chain. :type num_gaps: list[list[int]] :return: A tuple of gaps to remove and gaps to add. :rtype: tuple(list[list[tuple(int, int)]], list[list[tuple(int, int)]]) """ gaps_to_remove = [] gaps_to_add = [] for seq_adjust_by in adjust_by: seq_gaps_to_remove = [sequence.GapRegion(0, 0)] seq_gaps_to_add = [] for chain_adjust_by in seq_adjust_by: seq_gaps_to_add.append(sequence.GapRegion(0, chain_adjust_by)) seq_gaps_to_remove.append(sequence.GapRegion( chain_adjust_by, 0)) seq_gaps_to_add.append(sequence.GapRegion(0, 0)) gaps_to_remove.append(seq_gaps_to_remove) gaps_to_add.append(seq_gaps_to_add) return gaps_to_remove, gaps_to_add def _addElementByChain(self, seq, index, chain, element): """ Add the given element to the specified chain of a sequence. :param seq: The sequence to insert into. :type seq: sequence.CombinedChainProteinSequence :param index: The index to insert the element at. Note that this is the index in `seq`, not `chain`. :type index: int :param chain: The chain to insert into :type chain: sequence.ProteinSequence :param element: The element to insert :type element: str or residue.AbstractSequenceElement """ self._assertCanInsert(seq, [index]) seq.insertElementByChain(index, chain, element) self.signals.residuesAdded.emit([element]) @msv_utils.const def _getInvalidatedBonds(self, seqs): # See parent class for method documentation # This method only cares about inter-sequence disulfides, which can't # happen in a combined-chain alignment. Inter-chain disulfides are # always between chains in the same entry, so they'll always be within a # single sequence in a combined-chain alignment. return [], [] def _restoreInvalidatedBonds(self, invalidated_known_bonds, invalidated_pred_bonds): # See parent class for method documentation # This method isn't needed for combined-chain alignments because # _getInvalidatedBonds always returns empty lists pass
[docs]def get_contiguous_groups(nums): """ Group numbers in a given list by contiguity. Each group that is returned will be a list of numbers where every value is an int that only differs from its neighbors by one. e.g. [1, 2, 4] -> [[1, 2], [4]] [1, 2, 4, 5, 10] -> [[1, 2], [4, 5], [10]] :param nums: A list of numbers to group :type nums: list(int) :return: A list of groups of numbers, where the numbers in each group are contiguous :rtype: list(list(int)) """ assert len(nums) > 0 nums.sort() groups = [] start = nums[0] while start <= nums[-1]: group = [] for num in range(start, nums[-1] + 1): start = num + 1 if num not in nums: break group.append(num) if len(group): groups.append(group) return groups