Source code for schrodinger.application.msv.structure_model

import collections
import contextlib
import copy
import itertools
import os
import typing
import warnings
import weakref
from collections import defaultdict
from collections import deque
from collections import namedtuple
from functools import partial

import schrodinger
from schrodinger import project
from schrodinger import structure
from schrodinger.application.msv import seqio
from schrodinger.application.msv.gui import gui_alignment
from schrodinger.application.msv.gui.viewconstants import Inclusion
from schrodinger.infra import util
from schrodinger.models import diffy
from schrodinger.protein import align
from schrodinger.protein import alignment
from schrodinger.protein import annotation
from schrodinger.protein import residue
from schrodinger.protein import seqres
from schrodinger.protein import sequence
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtWidgets
from schrodinger.structutils import analyze
from schrodinger.ui import maestro_ui
from schrodinger.ui.qt.appframework2 import maestro_callback
from schrodinger.utils import fileutils
from schrodinger.utils import scollections

maestro = schrodinger.get_maestro()
pymol = None
SCRATCH_ENTRY_ID = -1


class _EntryData(dict):
    """
    Information about a single entry from the current Maestro project.

    :ivar has_seqres: Whether the entry has SEQRES records.
    :vartype has_seqres: bool
    """

    def __init__(self, has_seqres):
        """
        :param has_seqres: Whether the entry currently has SEQRES records.
        :type has_seqres: bool
        """
        super().__init__()
        self.has_seqres = has_seqres


def _gen_renumbered_res_map(seq, start, increment, preserve_icode):
    """
    Generate maps mapping old residue numbers to new residue numbers given
    a renumbering scheme (i.e. a sequence, a resnum to start, and an increment)
    Returns two maps, a map for all nonstructured residues and a map for all
    structured residues. These maps can be used to renumber residues using
    `seq.renumberResidues`.

    :param seq: The sequence to renumber.
    :type  seq: protein.sequence.AbstractSingleChainSequence

    :param start: The number to start the renumbering with
    :type  start: int

    :param increment: The amount to increment while numbering
    :type  increment: int

    :param preserve_icode: Whether to keep the inscode for the residues. If
        False, all inscodes will be set to " ".
    :type  preserve_icode: bool

    :rtype: tuple(dict[(int, str), (int, str)], dict[(int, str), (int,str)])
    """
    # Create map of old residue numbers to new residue numbers
    new_resnum = start
    nonst_resnum_mapping = {}  # nonstructured residues
    st_resnum_mapping = {}
    for res in seq:
        if res.is_res:
            new_inscode = res.inscode if preserve_icode else " "
            old_rescode = res.getChainKey()
            new_rescode = (new_resnum, new_inscode)
            if res.hasStructure():
                st_resnum_mapping[old_rescode] = new_rescode
            else:
                nonst_resnum_mapping[old_rescode] = new_rescode
            new_resnum += increment
    return nonst_resnum_mapping, st_resnum_mapping


[docs]class RenumberResiduesError(ValueError): pass
def _gen_renumbered_res_by_template_map(source_seq, template_seq): """ Generate maps mapping old residue numbers to new residue numbers given a template sequence. Returns two maps, a map for all nonstructured residues and a map for all structured residues. These maps can be used to renumber residues using `seq.renumberResidues`. :param source_seq: input sequence to be renumbered :type source_seq: schrodinger.protein.sequence.ProteinSequence :param template_seq: template sequence :type template_seq: schrodinger.protein.sequence.ProteinSequence :raises RenumberResiduesError: if there aren't enough valid insertion codes to do the renumbering. """ # Make a copy of the source sequence since we don't want to directly modify # it. source_seq_copy = sequence.ProteinSequence(str(source_seq)) # Align the seq and template seq aligner = align.MaxIdentityAligner() aln = alignment.ProteinAlignment([source_seq_copy, template_seq]) aligner.run(aln) # Extract new residue numbers from template seq template_seq.removeElements( [template_seq[g.idx_in_seq] for g in source_seq_copy.getGaps()]) gap_length = 0 newnums = [] for res in template_seq: if not res.is_res: gap_length += 1 continue if gap_length != 0: if not newnums: # Leading gaps. Use numbers smaller than the next resnum. next_resnum = res.resnum for resnum in range(next_resnum - gap_length, next_resnum): newnums.append((resnum, " ")) gap_length = 0 else: # generate rescodes for the gaps first_rescode = newnums[-1] last_rescode = res.getChainKey() new_codes = sequence.gen_resnums_and_inscodes( *first_rescode, *last_rescode) if len(new_codes) < gap_length: raise RenumberResiduesError( "Optimal alignment of template " "sequence requires more insertion codes than " "available.") newnums.extend(new_codes[:gap_length]) newnums.append(res.getChainKey()) gap_length = 0 # Create mapping of old residue numbers to new residue numbers nonst_resnum_mapping = {} # nonstructured residues st_resnum_mapping = {} for s_res, newcode in zip(source_seq.residues(), newnums): old_rescode = s_res.getChainKey() if s_res.hasStructure(): st_resnum_mapping[old_rescode] = newcode else: nonst_resnum_mapping[old_rescode] = newcode return nonst_resnum_mapping, st_resnum_mapping def _gen_renumbered_res_by_antibody_cdr(seq, new_res_num_list): """ Generate maps mapping old residue numbers to new residue numbers based the AntibodyCDR scheme. Returns two maps, a map for all nonstructured residues and a map for all structured residues. These maps can be used to renumber residues using `seq.renumberResidues`. :param seq: input sequence to be renumbered :type seq: schrodinger.protein.sequence.ProteinSequence :param new_res_num_list: List of residue numbers per the Antibody CDR scheme :type new_res_num_list: List[str] :return: Maps for structured residues and structureless residues. :rtype: tuple(dict[(int, str), (int, str)], dict[(int, str), (int,str)]) """ nonst_resnum_mapping = {} # nonstructured residues st_resnum_mapping = {} for s_res, newcode in zip(seq.residues(), new_res_num_list): _resnum, _inscode = annotation.parse_antibody_rescode(newcode) new_res_code = residue.ResidueChainKey(resnum=_resnum, inscode=_inscode) old_rescode = s_res.getChainKey() if s_res.hasStructure(): st_resnum_mapping[old_rescode] = new_res_code else: nonst_resnum_mapping[old_rescode] = new_res_code return nonst_resnum_mapping, st_resnum_mapping class _ChainData(QtCore.QObject): """ Information about a single chain of a single entry from the current Maestro project. :cvar wsVisibilityChangeRequested: A signal emitted when the workspace visibility of a chain should be changed. The `MaestroStructureModel` instance is responsible for changing the workspace visibility in response to this signal. Emitted with: - the entry id of the chain (int) - the chain name (str) - whether the chain should be shown (True) or hidden (False) (bool) - whether the entry is currently in the workspace (bool) :vartype wsVisibilityChangeRequested: `QtCore.pyqtSignal` :ivar eid: The entry id of the chain. :vartype eid: int :ivar chain: The chain name. :vartype chain: str :ivar seqs: A set of all sequences representing this chain. :vartype seqs: `weakref.WeakSet` :ivar workspace_seq: The sequence representing this chain in the workspace alignment (i.e. the alignment shown in the workspace tab). Note that this sequence also appears in `seqs`. :vartype workspace_seq: schrodinger.protein.sequence.Sequence """ wsVisibilityChangeRequested = QtCore.pyqtSignal(int, str, bool, bool) _changingSeqVisibility = util.flag_context_manager( "_changing_seq_visibility") def __init__(self, eid, chain, all_res, vis_res, included): """ :param eid: The entry id of the chain. :type eid: int :param chain: The chain name. :type chain: str :param all_res: A set of all residues in the chain, where each residue is a tuple of (residue number, insertion code). :type all_res: set(residue.ResidueChainKey) :param vis_res: A set of all residues in the chain that are currently visible in the Maestro workspace, where each residue is a tuple of (residue number, insertion code). :type vis_res: set(residue.ResidueChainKey) :param included: Whether the entry is currently included in the Maestro workspace. :type included: bool """ super().__init__() self.eid = eid self.chain = chain self.seqs = weakref.WeakSet() self.workspace_seq = None self._all_res = all_res self._vis_res = vis_res self._included = included self._visibility = None self._visibility_slots = scollections.IdDict() self._changing_seq_visibility = False self._updateVisibility() self._rescode_to_residues_map = defaultdict(set) def renumberResiduesByTemplate(self, seq, template_seq): """ Renumber `seq` based on the residue numbers of `template_seq`. :param seq: input sequence to be renumbered :type seq: schrodinger.protein.sequence.ProteinSequence :param template_seq: template sequence :type template_seq: schrodinger.protein.sequence.ProteinSequence """ nonst_resnum_mapping, st_resnum_mapping = _gen_renumbered_res_by_template_map( seq, template_seq) self._renumberResiduesByMap(seq, nonst_resnum_mapping, st_resnum_mapping) def renumberResiduesByAntibodyCDR(self, seq, new_res_num_list): """ Renumber residues in the sequence based on the given new numbers. """ nonst_resnum_mapping, st_resnum_mapping = _gen_renumbered_res_by_antibody_cdr( seq, new_res_num_list) self._renumberResiduesByMap(seq, nonst_resnum_mapping, st_resnum_mapping) def _renumberResiduesByMap(self, seq, nonst_resnum_map, st_resnum_map): # Apply new residue numbering to every sequence linked to this chain # Renumber both non-structured and structured residues for `seq` seq.renumberResidues({**nonst_resnum_map, **st_resnum_map}) # Renumber only structured residues for every other sequence for other_seq in self.seqs: if other_seq is seq: continue other_seq.renumberResidues(st_resnum_map) # Apply new residue numbering to the structure st = seq.getStructure() ch = st.chain[seq.structure_chain] for st_res in ch.residue: if (st_res.resnum, st_res.inscode) in st_resnum_map: new_resnum, new_inscode = st_resnum_map[st_res.resnum, st_res.inscode] st_res.resnum = new_resnum st_res.inscode = new_inscode seq.setStructure(st) # Update the resnum to residues map self._rescode_to_residues_map.clear() for seq in self.seqs: self._updateRescodeMap(seq) seq.onStructureChanged() def renumberResidues(self, seq, start, increment, preserve_icode): """ Renumber the residues for `seq`. New residue numbers and insertion codes will be propagated to structured residues in all `seq`s managed by this `_ChainData`. :param seq: The sequence to renumber. :type seq: protein.sequence.AbstractSingleChainSequence :param start: The number to start the renumbering with :type start: int :param increment: The amount to increment while numbering :type increment: int :param preserve_icode: Whether to keep the inscode for the residues. If False, all inscodes will be set to " ". :type preserve_icode: bool """ nonst_resnum_mapping, st_resnum_mapping = _gen_renumbered_res_map( seq, start, increment, preserve_icode) self._renumberResiduesByMap(seq, nonst_resnum_mapping, st_resnum_mapping) @property def included(self): """ Whether the entry is currently included in the Maestro workspace. :type: bool """ return self._included @included.setter def included(self, value): self._included = value self._updateVisibility() def updateVisRes(self, added, removed): """ Update the set of residues that are currently visible in the Maestro workspace. :param added: The set of residues that were just added to the workspace. :type added: set :param removed: The set of residues that were just removed from the workspace. :type removed: set :note: Each residue must be a tuple of (residue number, insertion code). """ self._vis_res -= removed self._vis_res |= added self._updateVisibility() def clearVisRes(self): """ Clear the set of residues that are currently visible in the Maestro workspace. """ self._vis_res.clear() self._updateVisibility() def _updateVisibility(self): """ Update self.visibility after an inclusion or a residue visibility change. """ if len(self._all_res) == 0: # This chain is about to be deleted so don't worry about the # visibility return elif not self._included: new_visibility = Inclusion.Excluded elif len(self._vis_res) == len(self._all_res): new_visibility = Inclusion.FullyVisible elif len(self._vis_res) == 0: new_visibility = Inclusion.NotVisible else: new_visibility = Inclusion.PartiallyVisible if new_visibility != self._visibility: self._visibility = new_visibility self._setSequenceVisibility(new_visibility) def _setSequenceVisibility(self, visibility): """ Update the visibility of all sequences that represent this chain. :param visibility: The new visibility of this chain :type visibility: `Inclusion` """ with self._changingSeqVisibility(): for cur_seq in self.seqs: cur_seq.visibility = visibility def updateAllRes(self, added, removed): """ Update the set of residues that exist in this chain. :param added: The set of residues that were just added to the chain. :type added: set :param removed: The set of residues that were just removed from the chain. :type removed: set :note: Each residue must be a tuple of (residue number, insertion code). """ # TODO: update self._all_res self._updateVisibility() # TODO: update sequences @property def visibility(self): """ The visibility of this chain in the Maestro workspace. May not be updated directly. Use `updateVisRes` or `updateAllRes` instead. :type: `Inclusion` """ return self._visibility def addSeq(self, seq): """ Monitor a new sequence that represents this chain. :param seq: The sequence to monitor :type seq: schrodinger.protein.sequence.Sequence """ if seq in self.seqs: # we're already monitoring this sequence return seq.visibility = self.visibility self.connectSeq(seq) self.seqs.add(seq) self._updateRescodeMap(seq) @QtCore.pyqtSlot(set) def _updateRescodeMap(self, residues): for res in residues: if res.is_gap or res.seqres_only: continue self._rescode_to_residues_map[res.getChainKey()].add(res) @QtCore.pyqtSlot(set) def _removeFromRescodeMap(self, residues): for res in residues: if res.is_gap or res.seqres_only: continue self._rescode_to_residues_map[res.getChainKey()].remove(res) def removeSeq(self, seq): """ Stop monitoring a sequence that represents this chain and strip it of structural data. """ # make sure that the partial doesn't keep this object alive self.disconnectSeq(seq) seq.visibility = None seq._get_structure = None seq._set_structure = None seq.entry_id = None seq.structure_chain = None self.seqs.remove(seq) seq.onStructureChanged() self._removeFromRescodeMap(seq) def disconnect(self): """ Disconnect all signals and slots between the sequences and this _ChainData. """ for seq in self.seqs: self.disconnectSeq(seq) def connectSeq(self, seq): seq.sequenceCopied.connect(self._sequenceCopied) seq.residuesAdded.connect(self._updateRescodeMap) seq.residuesRemoved.connect(self._removeFromRescodeMap) # Partial slots with references to self cause problems with garbage # collection. To avoid this, we replace self with a weakref. self = weakref.proxy(self) vis_slot = partial(self._sequenceChangedVisibility, seq) self._visibility_slots[seq] = vis_slot seq.visibilityChanged.connect(vis_slot) def disconnectSeq(self, seq): vis_slot = self._visibility_slots[seq] seq.visibilityChanged.disconnect(vis_slot) seq.sequenceCopied.disconnect(self._sequenceCopied) seq.residuesAdded.disconnect(self._updateRescodeMap) seq.residuesRemoved.disconnect(self._removeFromRescodeMap) def mapRescodeToResidues(self, chain_key): """ Return residues matching a given residue key relative to entry and chain :type chain_key: residue.ResidueChainKey """ return self._rescode_to_residues_map[chain_key] def getAllStructuredResiduesExcept(self, chain_keys): """ Get all structured residues in this chain other than those specified. :param chain_keys: Residues to exclude from the return value :type chain_keys: set(residue.ResidueChainKey) :return: All other residues :rtype: list(residue.Residue) """ residues = [] for key, res in self._rescode_to_residues_map.items(): if key not in chain_keys: residues.extend(res) return residues @util.skip_if("_changing_seq_visibility") def _sequenceChangedVisibility(self, seq): """ When a sequence changes visibility, update all other sequences and the Maestro workspace. :param seq: The sequence that changed visibility. :type seq: sequence.Sequence """ old_inclusion = self._included self._included = True new_visibility = seq.visibility if new_visibility is Inclusion.FullyVisible: # We can't do "self._vis_res = self._all_res" here, since that would # make both attributes point to the same set object. self._vis_res |= self._all_res show = True elif new_visibility is Inclusion.NotVisible: self._vis_res.clear() show = False else: raise RuntimeError("Cannot set sequence visibility to %s" % new_visibility) self._visibility = new_visibility self._setSequenceVisibility(new_visibility) self.wsVisibilityChangeRequested.emit(self.eid, self.chain, show, old_inclusion) def chainRemoved(self): """ Respond to the entry being removed from the project. Mark all sequences as not having an associated structure and disconnect the sequences from visibility updates. """ with self._changingSeqVisibility(): for cur_seq in list(self.seqs): self.removeSeq(cur_seq) @QtCore.pyqtSlot(object, object) def _sequenceCopied(self, orig_seq, copy_seq): """ When a sequence that we're monitoring is copied, make sure the copy can properly get and set the structure and that it gets monitored for changes. :param orig_seq: The sequence being copied. :type orig_seq: schrodinger.protein.sequence.Sequence :param copy_seq: The newly created copy. :type copy_seq: schrodinger.protein.sequence.Sequence """ self.addSeq(copy_seq) copy_seq._get_structure = orig_seq._get_structure copy_seq._set_structure = orig_seq._set_structure copy_seq.structure_chain = orig_seq.structure_chain
[docs]class AbstractStructureModel(QtCore.QObject): """ Manages interactions between sequences and their associated structures. A separate AbstractStructureModel subclass should be created for each program MSV can run alongside of (i.e. Maestro, PyMol, standalone). For programs with a workspace, this class also maintains the workspace alignment, which contains sequences for all structures currently included in the workspace. This class should not be instantiated directly. Instead `StructureModel` should be instantiated, which will create an object of the appropriate `AbstractStructureModel` subclass. Subclasses must implement `_readStructures`. Subclasses for programs with a workspace should implement `getWorkspaceAlignment`, `getIncludedEntries`, and `importStructuresIntoWorkspace` and should set `IMPLEMENTS_GET_INCLUDED` to True. Subclasses for programs with a concept of selected entries should implement `getSelectedEntries`, and should set `IMPLEMENTS_GET_SELECTED` to True. Note that there should be one structure model instance per panel, *not* one per tab. :cvar IMPLEMENTS_GET_SELECTED: Whether `getSelectedEntries` is implemented. Should be set to True in any subclass that implements this method. :vartype IMPLEMENTS_GET_SELECTED: bool :cvar IMPLEMENTS_GET_INCLUDED: Whether `getIncludedEntries` is implemented. Should be set to True in any subclass that implements this method. :vartype IMPLEMENTS_GET_INCLUDED: bool :cvar IMPLEMENTS_AUTOLOAD: Whether this class implements the concept of autoloading. Subclasses that implement autoloading should define `getMsvAutosaveProjectName` and emit `projectSaveRequested` and `projectLoadRequested` whenever an autoload or autosave is required. :vartype IMPLEMENTS_AUTOLOAD: bool :ivar workspaceColorsChanged: Signal emitted when colors of atoms in the workspace change, if the associated program has a concept of a workspace. :vartype workspaceColorsChanged: QtCore.pyqtSignal :ivar seqProjectTitlesChanged: Signal emitted when Project Table entry titles change for sequences. Emits a dict mapping sequences whose titles have changed to their new title in the Project Table and whether an immediate sequence name update should be performed. :vartype seqProjectTitlesChanged: QtCore.pyqtSignal(dict( sequence.ProteinSequence: str), bool) :ivar projectLoadRequested: Signal emitted when the MSV should autoload a project. :ivar projectSaveRequested: Signal emitted when the MSV should autosave a project. Emits if it should reset the last save file name. :vartype projectSaveRequested: QtCore.pyqtSignal(bool) :ivar structureWarningProduced: Signal emitted when a loading a structure produces a warning """ IMPLEMENTS_GET_SELECTED = False IMPLEMENTS_GET_INCLUDED = False IMPLEMENTS_AUTOLOAD = False workspaceColorsChanged = QtCore.pyqtSignal() seqProjectTitlesChanged = QtCore.pyqtSignal(dict, bool) projectLoadRequested = QtCore.pyqtSignal() projectSaveRequested = QtCore.pyqtSignal(bool) structureWarningProduced = QtCore.pyqtSignal(str)
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._gui_model = None
[docs] def renumberResiduesByTemplate(self, seq, template_seq): """ Renumber `seq` based on the residue numbers of `template_seq`. :param seq: input sequence to be renumbered :type seq: schrodinger.protein.sequence.ProteinSequence :param template_seq: template sequence :type template_seq: schrodinger.protein.sequence.ProteinSequence """ resmap, _ = _gen_renumbered_res_by_template_map(seq, template_seq) seq.renumberResidues(resmap)
[docs] def renumberResidues(self, seq, start, increment, preserve_icode): """ Renumbers residues for a sequence. """ resmap, _ = _gen_renumbered_res_map(seq, start, increment, preserve_icode) seq.renumberResidues(resmap)
[docs] def renumberResiduesByAntibodyCDR(self, seq, new_res_num_list): """ Renumber residues in the sequence based on the given new numbers. :param seq: Sequnce to be renumbered :type seq: protein.sequence.ProteinSequence :param new_res_num_list: List of residue numbers based on the Antibody CDR numbering scheme. :type new_res_num_list: List[str] """ resmap, _ = _gen_renumbered_res_by_antibody_cdr(seq, new_res_num_list) seq.renumberResidues(resmap)
[docs] def mapResidues(self, residues): """ Map residues to all residues represented by the same structure residue. Note that only structures currently included in the workspace are considered. If a residue has no structure, the residue is included unchanged. """ return residues
[docs] def setGuiModel(self, gui_model): self._gui_model = gui_model
[docs] @QtCore.pyqtSlot(object, object) def onPagesMutated(self, new_pages, old_pages): """ Update state in response to gui_model.pages.mutated signal. Note that this method must be connected to using `getSignalsAndSlots` rather `mutated.connect`. """ pass
[docs] def renameSeq(self, seq, new_name): """ Rename the specified sequence :param seq: Sequence to be renamed :type seq: sequence.ProteinSequence :param new_name: New name for the sequence :type new_name: str """ raise NotImplementedError
[docs] def getWorkspaceAlignment(self): """ Return an alignment that contains all entries included in the workspace. The structure model will ensure that this alignment is always kept in sync with the workspace. Returns None if the associated program has no concept of a workspace (i.e. StandaloneStructureModel). :note: This method will always return a split-chain alignment regardless of the current split-chain view setting. :rtype: `alignment.BaseAlignment` or NoneType """ return None
[docs] def getLinkedAlnSeqs(self, seq): """ Return a set of linked sequences to the specified sequence :type seq: sequence.ProteinSequence :rtype: set """ return set()
[docs] def getSelectedEntries(self): """ Returns a list of sequences for all entries that are currently selected in the project table. Raises NotImplementedError if the associated program has no concept of a selected entry. :rtype: list """ raise NotImplementedError
[docs] def getIncludedEntries(self): """ Returns a list of sequences for all entries that are currently included in the workspace. Raises NotImplementedError if the associated program has no concept of a workspace. :rtype: list """ raise NotImplementedError
[docs] def getWorkspaceColors(self): """ Returns a dict mapping residues to their color in the workspace. :rtype: dict """ raise NotImplementedError
[docs] def setWorkspaceColors(self, color_map, all_atoms=False): """ Sets the colors in the workspace to the colors given by color_map. :type color_map: dict :param all_atoms: Whether to color all atoms or just carbons :type all_atoms: bool """ raise NotImplementedError
[docs] def importFile(self, filename): """ Return sequences for the specified file. If the file contains structural data, then the sequences will have associated structures accessible via `sequence.getStructure()`. :param filename: The filename to read :type filename: str :return: All sequences. Note that these sequences *have not* been loaded into *any* alignment, including the workspace alignment. If `filename` contains structural data and the current structure model backend implements a workspace, see `importStructuresIntoWorkspace`, which imports a file and returns the corresponding workspace alignment sequences. :rtype: list(sequence.Sequence) :raise IOError: If there was an error importing the file. """ if fileutils.get_structure_file_format(filename) is not None: return self._readStructures(filename) else: return self._readSequences(filename)
[docs] def importFiles(self, filenames): """ Return sequences for all specified files. If any of the files contain structural data, then those sequences will have associated structures accessible via `sequence.getStructure()`. :param filenames: The filenames to read :type filenames: iterable :return: All imported sequences. Note that these sequences *have not* been loaded into *any* alignment, including the workspace alignment. :rtype: list(sequence.Sequence) :raise IOError: If there was an error importing the files. """ if isinstance(filenames, str): raise TypeError("importFiles expects a non-string iterable. Try " "importFile instead.") seqs = [] for cur_file in filenames: seqs.extend(self.importFile(cur_file)) return seqs
def _readStructures(self, filename): """ Return sequences for the specified file, which must contain structural data. The sequences will have associated structures accessible via `sequence.getStructure()`. :param filename: The filename to read :type filename: str :return: All sequences :rtype: list(sequence.Sequence) """ raise NotImplementedError def _readSequences(self, filename): """ Return sequences for the specified file, which does not contain structural data. :param filename: The filename to read :type filename: str :return: All sequences :rtype: list(sequence.Sequence) """ return seqio.read_sequences(filename) def _convertStructure(self, st, *args, **kwargs): """ Return the sequences converted from the given `st`. args and kwargs will be passed to seqio.StructureConverter.convert. """ with seqio.catch_sequence_warnings() as warn_catcher: seqs = seqio.StructureConverter.convert(st, *args, **kwargs) if warn_catcher.message: # This can be called during panel init, so use a single-shot timer # so the slot doesn't run until init is done QtCore.QTimer.singleShot( 0, lambda: self.structureWarningProduced.emit(warn_catcher. message)) return seqs
[docs] def importStructuresIntoWorkspace(self, filename): """ Import all structures from the given file into the workspace and include only the first structure. :param filename: The filename to read :type filename: str :return: Sequences from the workspace alignment that correspond to the newly imported structures. :rtype: list(sequence.Sequence) """ raise NotImplementedError
[docs] @classmethod def generateEntryResidueASL(cls, residues_by_entry): """ Generate an ASL string for the given entry IDs and residues. :param residues_by_entry: Mapping of entry id to residues :type residues_by_entry: dict[str, list(protein.residue.Residue)] """ asl_parts = [] for eid, residues in sorted(residues_by_entry.items()): entry_asl = cls.generateResidueASL(residues) if entry_asl is not None: entry_asl = f'(entry.id {eid} AND ({entry_asl}))' asl_parts.append(entry_asl) return " OR ".join(asl_parts)
[docs] @staticmethod def generateResidueASL(residues): """ Generate an ASL string for the given residues. Residues should be from the same entry. :type residues: collections.abc.Iterable(protein.residue.Residue) """ sresidues = ( res.sequence.getStructureResForRes(res) for res in residues) sresidues = [sres for sres in sresidues if sres is not None] # There are bugs with selecting multiple negative residue numbers at # the same time (e.g. "res.num -31, -32". See SHARED-7239). Using the # ASL for each negative residue separately works around the bug. neg_sresidues = [sres for sres in sresidues if sres.resnum < 0] nonneg_sresidues = [sres for sres in sresidues if sres.resnum >= 0] asl_parts = [res.getAsl() for res in neg_sresidues] if nonneg_sresidues: asl_parts.append(analyze.generate_residue_asl(nonneg_sresidues)) return " OR ".join(asl_parts)
[docs] @classmethod def generateMultiEntryResidueASL(cls, residues): """ Generate an ASL string for the given residues. Residues can be from different entries. :type residues: collections.abc.Iterable(protein.residue.Residue) """ resmap = collections.defaultdict(list) for res in residues: if res.is_gap or res.sequence.entry_id is None: continue resmap[res.sequence.entry_id].append(res) return cls.generateEntryResidueASL(resmap)
[docs] def applyWorkspaceSelectionToSeqs(self, aln, seqs=None): """ Select any residues in the given sequences that are selected in the workspace. Sequences without structures or with structures that aren't currently included in the workspace are ignored. This method is a no-op for structure models without a workspace. :param aln: The alignment to select the residues in :type aln: gui_alignment._ProteinAlignment :param seqs: The sequences to select residues in. If not given, all sequences in `aln` will be used. :type seqs: Iterable(sequence.Sequence) """
# This method intentionally left blank
[docs] def delayedSyncFromMsvToWorkspace(self, aln): """ Replace residue selection in the workspace with residue selection in the given alignment. When new entries are included in the workspace, their residue selection is not automatically synchronized until selection is changed in either the workspace (in which case residue selection from the workspace is applied to the MSV) or the MSV (in which case residue selection from the MSV active tab is applied to the workspace and to the other MSV tabs). This method forces selection to be immediately synchronized. This method will also remove workspace selection for any entries without a linked sequence in the given alignment. This method is a no-op for structure models without a workspace. :param aln: The alignment to take residue selection from :type aln: gui_alignment._ProteinAlignment """
# This method intentionally left blank
[docs] def getStructSeq(self, entry_id, chain_name): """ Return a sequence for the chain structure specified by `entry_id` and `chain_name`. This sequence will *not* be monitored by the structure model in any way and will not be kept up to date with any changes to the structure. This method will always raise a ValueError for structure models without a workspace. :param entry_id: The entry id of the structure. :type entry_id: int or str :param chain_name: The name of the chain to create a sequence for. :type chain_name: str :return: The requested sequence :rtype: sequence.Sequence :raises ValueError: If the specified entry_id or chain don't exist. """ raise ValueError("No workspace.")
[docs] def linkSequence(self, seq, entry_id, chain_name): """ Link a sequence to the structure specified by entry_id and chain name. This method will always raise a ValueError for structure models without a workspace. :param seq: The sequence to associate with a structure. :type seq: sequence.Sequence :param entry_id: The entry id of the structure to associate :type entry_id: str or int :param chain_name: The name of chain of the structure to associate with the sequence. :type chain_name: str :raises ValueError: If the specified entry_id or chain don't exist. """ raise ValueError("No workspace.")
[docs]class StructureModelMeta(type): def __instancecheck__(self, instance): """ Make sure that any object instantiated via `StructureModel` is an instance of `StructureModel`. """ return isinstance(instance, AbstractStructureModel)
[docs]class StructureModel(metaclass=StructureModelMeta): """ When instantiated, this class will return the appropriate `AbstractStructureModel` subclass. """ def __new__(cls, parent, undo_stack): """ :param parent: The Qt parent widget :type parent: QtWidgets.QWidget :param undo_stack: The undo stack :type undo_stack: schrodinger.application.msv.command.UndoStack """ if maestro: return MaestroStructureModel(parent, undo_stack) elif pymol: return PyMolStructureModel() else: return StandaloneStructureModel()
[docs]class NewResInfo( namedtuple("NewResInfo", ("resnum", "inscode", "resname", "is_na"))): """ Description of a new residue added to the workspace structure during a residuesChanged signal. """ # TODO MSV-2379: Consider all items in WHResidue.d_hash def __new__(cls, resnum, inscode, resname, is_na=None): # Make is_na (whether the residue is a nucleic acid) optional return super().__new__(cls, resnum, inscode, resname, is_na)
[docs] def chainKey(self): """ A key to uniquely identify the residue within the chain """ return residue.ResidueChainKey(self.resnum, self.inscode)
[docs]class WHResInfo(typing.NamedTuple): """ Tuple to hash WHResidue appropriately. Used to create `NewResInfo` objects. Note: not using WHResidue.getHash() because it also considers molecule number """ # TODO MSV-2379: Consider all items in WHResidue.d_hash eid: int chain: str resnum: int inscode: str resname: str is_na: bool
[docs] @classmethod def fromWHRes(self, whres): """ Generate a `WHResInfo` object from a `WHResidue` object. """ return WHResInfo(int(whres.getEntryID()), whres.getChain(), whres.getResNum(), whres.getInsCode(), whres.getPDBName(), whres.isDNA() or whres.isRNA())
[docs] def entryKey(self): """ A key to uniquely identify the entry chain """ return (self.eid, self.chain)
[docs] def chainKey(self): """ A key to uniquely identify the residue within the chain """ return residue.ResidueChainKey(self.resnum, self.inscode)
[docs] def residueKey(self): """ A key to uniquely identify the residue and chain """ return residue.ResidueKey(self.eid, self.chain, self.resnum, self.inscode)
[docs]class MaestroStructureModel(AbstractStructureModel): IMPLEMENTS_GET_SELECTED = True IMPLEMENTS_GET_INCLUDED = True IMPLEMENTS_AUTOLOAD = True # valid PDB names for protein residues VALID_AA_NAMES = set(residue.AMINO_ACIDS_THREE_LETTER.keys()) - {"UNK"} VALID_NA_NAMES = set(residue.NA_THREE_LETTER.keys()) _changingMaestroVisibility = util.flag_context_manager( "_changing_maestro_visibility") _updatingSeqres = util.flag_context_manager("_updating_seqres") _updatingColor = util.flag_context_manager("_updating_color") _syncingSelection = util.flag_context_manager("_syncing_selection") _syncingInclusion = util.flag_context_manager("_syncing_inclusion") _renamingProjectEntries = util.flag_context_manager( '_renaming_project_entries')
[docs] def __init__(self, parent, undo_stack): """ :param parent: The Qt parent widget :type parent: QtWidgets.QWidget :param undo_stack: The undo stack. This will be cleared whenever Maestro initiates a change that we can't undo. :type undo_stack: schrodinger.application.msv.command.UndoStack """ super().__init__(parent) self.undo_stack = undo_stack self._syncing_selection = False self._syncing_inclusion = False self._changing_maestro_visibility = False self._closing_project = False self._updating_seqres = False self._updating_color = False # Sequences that shouldn't have their titles synchronized with the # Maestro entry name. Everything else about the sequences is still # synchronized. self.unsynched_seqs = set() # a dictionary of [entry id as int][chain name] = _ChainData for chain self._entry_chain_map = {} self._name_synch_aln = None self._renaming_project_entries = False self._request_immediate_rename = False self._valid_seqres_names = self.VALID_AA_NAMES | self.VALID_NA_NAMES self._workspace_hub = maestro_ui.WorkspaceHub.instance() for signal, slot in self._getWHSignalsAndSlots(self._workspace_hub): signal.connect(slot) self._maestro_hub = maestro_ui.MaestroHub.instance() for signal, slot in self._getMHSignalsAndSlots(self._maestro_hub): signal.connect(slot) self._workspace_aln = gui_alignment.GuiProteinAlignment( is_workspace=True) self._split_workspace_aln = self._workspace_aln self._inclusion_changing = set() self._delayed_sync_eids = set() included_eids = maestro.get_included_entry_ids() self._initEidsInWorkspaceAln(included_eids) self.applyWorkspaceSelectionToSeqs(self._workspace_aln) for callback_info, callable in self._getMaestroCallbacks(): callback_info.add(callable)
[docs] def renumberResiduesByTemplate(self, seq, template_seq): """ Renumber `seq` based on the residue numbers of `template_seq`. :param seq: input sequence to be renumbered :type seq: schrodinger.protein.sequence.ProteinSequence :param template_seq: template sequence :type template_seq: schrodinger.protein.sequence.ProteinSequence """ cur_sel = self._gui_model.current_page.split_aln.res_selection_model.getSelection( ) if seq.entry_id is not None: chain_data = self._entry_chain_map[int( seq.entry_id)][seq.structure_chain] chain_data.renumberResiduesByTemplate(seq, template_seq) else: super().renumberResiduesByTemplate(seq, template_seq) self._syncSelectionToMaestro(cur_sel)
[docs] def renumberResidues(self, seq, start, increment, preserve_icode): """ Renumbers residues for a sequence and propagates the renumbering to all sequences linked to the same chain. See `_ChainData.renumberResidues` for more documentation. """ cur_sel = self._gui_model.current_page.split_aln.res_selection_model.getSelection( ) if seq.entry_id is not None: chain_data = self._entry_chain_map[int( seq.entry_id)][seq.structure_chain] chain_data.renumberResidues(seq, start, increment, preserve_icode) else: super().renumberResidues(seq, start, increment, preserve_icode) self._syncSelectionToMaestro(cur_sel)
[docs] def renumberResiduesByAntibodyCDR(self, seq, new_res_num_list): cur_sel = self._gui_model.current_page.split_aln.res_selection_model.getSelection( ) if seq.entry_id is not None: chain_data = self._entry_chain_map[int( seq.entry_id)][seq.structure_chain] chain_data.renumberResiduesByAntibodyCDR(seq, new_res_num_list) else: super().renumberResiduesByAntibodyCDR(seq, new_res_num_list) self._syncSelectionToMaestro(cur_sel)
[docs] def setGuiModel(self, gui_model): """ Set the GUI Model that this structure model should keep up to date. This method will update any view pages in the GUI model. If a workspace page is present, it will be updated. Otherwise, a new workspace page will be created. :param gui_model: The GUI model to keep up to date. :type gui_model: gui.gui_model.MsvGuiModel """ super().setGuiModel(gui_model) if gui_model.hasWorkspacePage(): ws_page = gui_model.getWorkspacePage() # Note that _updateWorkspacePage must be called before # _onWorkspaceSplitChainViewChanged so that _updateWorkspacePage is # always run with a split-chain workspace alignment. with self._syncingSelection(): self._updateWorkspacePage(ws_page) else: ws_page = gui_model.addWorkspacePage(self._workspace_aln) ws_page.split_chain_viewChanged.connect( self._onWorkspaceSplitChainViewChanged) # make sure we're up to date with the current split-chain view setting self._onWorkspaceSplitChainViewChanged() self.updateViewPages(gui_model) # connect residue selection changed signal at end (after WS selection # has been synced to MSV) for page in gui_model.pages: page.aln_signals.resSelectionChanged.connect( self._alignmentSelectionChanged)
def _updateWorkspacePage(self, page): """ Update an existing workspace page with the current workspace sequences. This should be called whenever the MSV panel is reopened, since it doesn't monitor workspace changes while it's closed. :param page: The workspace page to update :type page: gui.gui_model.PageModel """ if not page.is_workspace: msg = "This method should only be called on a workspace page" raise ValueError(msg) # aln is guaranteed to be a split-chain alignment since we haven't # looked at PageModel.split_chain_view yet aln = self._workspace_aln orig_aln = page.split_aln orig_seqs_map = defaultdict(dict) for seq in orig_aln: orig_seqs_map[seq.entry_id][seq.chain] = seq new_seqs = [seq for seq in aln] # Make all seqs unparented because they'll be put into a temporary # alignment in _alignSeqPair orig_aln.clear() aln.clear() for seq in new_seqs: try: orig_seq = orig_seqs_map[seq.entry_id][seq.chain] except KeyError: pass else: # transfer old gaps self._alignSeqPair(orig_aln, orig_seq, seq) aln.addSeqs(new_seqs) self.applyWorkspaceSelectionToSeqs(aln) page.aln = aln
[docs] def applyWorkspaceSelectionToSeqs(self, aln, seqs=None): # See parent class for method documentation if seqs is None: seqs = aln res_to_select = self._getResiduesSelectedInWorkspace(seqs) if not res_to_select: return with self._syncingSelection(): aln.res_selection_model.setSelectionState(res_to_select, True, _undoable=False) # make sure that the selection update happens while we're in the # _syncingSelection block so we know to ignore it aln.res_selection_model.forceSelectionUpdate()
def _getResiduesSelectedInWorkspace(self, seqs): """ Find all residues in the given sequences that correspond to selected residues in the workspace. :param seqs: The sequences to find select residues for. :type seqs: Iterable(sequence.Sequence) :return: The selected residues :rtype: set(residue.Residue) """ structured_seqs = [seq for seq in seqs if seq.entry_id] if not structured_seqs: return set() ws_sel = self._workspace_hub.getSelAtomsToResSet() whres_info = self._whresSet(ws_sel, set()) res_keys = {res.residueKey() for res in whres_info} selected = set() for cur_seq in structured_seqs: eid = cur_seq.entry_id seq_sel = { res for res in cur_seq if not res.is_gap and residue.get_residue_key( res, eid, res.structure_chain) in res_keys } selected.update(seq_sel) return selected
[docs] def updateViewPages(self, gui_model): """ Update linked sequences in all view (i.e. non-workspace) pages in the GUI model. This should be called whenever the MSV panel is reopened, since it doesn't monitor structure changes while it's closed. :param gui_model: The model to update. :type gui_model: gui.gui_model.MsvGuiModel """ for page in gui_model.getViewPages(): aln = page.split_aln seqs_to_update = self._getLinkedSequences(aln) for seq in seqs_to_update: try: self.linkSequence(seq, seq.entry_id, seq.chain) except ValueError: # the entry or chain was deleted while the MSV was closed pass page.regenerateCombinedChainAlignment() self.applyWorkspaceSelectionToSeqs(page.aln)
def _alignSeqPair(self, aln, ref_seq, other_seq): """ Align two seqs. They should be unparented (i.e. not already in an alignment) to avoid undefined behavior. Gaps in `other_seq` that don't appear in `ref_seq` will be removed. """ if ref_seq in aln or other_seq in aln: raise ValueError("Neither seq should be in an alignment.") aln_class = type(aln) tmp_alignment = aln_class([ref_seq, other_seq]) align.BiopythonPairwiseAligner().run(tmp_alignment) to_remove = [] # Collect gaps that appear only in other_seq for ref_elem, other_elem in tmp_alignment.columns(): if other_elem.is_gap and ref_elem.is_res: to_remove.append(other_elem) # Remove all gaps at once to avoid shifting columns tmp_alignment.removeElements(to_remove) tmp_alignment.clear()
[docs] @QtCore.pyqtSlot(object, object) def onPagesMutated(self, new_pages, old_pages): added, removed, moved = diffy.get_diff(new_pages, old_pages) for page, _ in added: page.aln_signals.resSelectionChanged.connect( self._alignmentSelectionChanged) for page, _ in removed: page.aln_signals.resSelectionChanged.disconnect( self._alignmentSelectionChanged)
@QtCore.pyqtSlot() def _onWorkspaceSplitChainViewChanged(self): self._workspace_aln = self._gui_model.getWorkspacePage().aln
[docs] def getStructSeq(self, entry_id, chain_name): # See parent class for method documentation seqs, _, _, _ = self._getUnlinkedSeqsForEid(entry_id) return self._getSeqForChain(seqs, chain_name)
def _getSeqForChain(self, seqs, chain_name): """ Given a list of sequences, find the sequence with the desired chain name. :param seqs: The sequences to search. :type seqs: Iterable(sequence.Sequence) :param chain_name: The name of the chain to find. :type chain_name: str :return: The first sequence with the specified chain name. :rtype: sequence.Sequence :raises ValueError: If no sequence with the specified chain name is present. """ for seq in seqs: if seq.chain == chain_name: return seq raise ValueError(f"Chain {chain_name} not found")
[docs] def linkSequence(self, seq, entry_id, chain_name): # See parent class for method documentation # _getUnlinkedSeqsForEid and _getSeqForChain will raise ValueErrors if # the specified entry id or chain name don't exist, so call those to # sanity check the input values before we start making changes. entry_id = int(entry_id) eid_seqs, row, struc, proj = self._getUnlinkedSeqsForEid(entry_id) struc_seq = self._getSeqForChain(eid_seqs, chain_name) # initialize data for the project entry if necessary if entry_id not in self._entry_chain_map: vis_res = self._getVisResIfNeeded({entry_id}) chain_data = self._initDataForEntry(entry_id, row, struc, eid_seqs, vis_res) self._entry_chain_map[entry_id] = chain_data seq.entry_id = str(entry_id) seq.structure_chain = chain_name # Copy before adding to chain data to avoid sequenceCopied side effects copied_seq = copy.deepcopy(seq) aligner = align.MaxIdentityAligner() tmp_alignment = gui_alignment.GuiProteinAlignment( [struc_seq, copied_seq]) aligner(tmp_alignment) # Change residue numbers before adding to chain data self._transferResidueInformation(from_seq=struc_seq, to_seq=seq, aligned_seq=copied_seq) self._addSeqsToChainData([seq], int(entry_id), proj) # Residue map needs entry ID and structure to be set seq.generateResidueMap() seq.onStructureChanged() if row.title != seq.name: # if the sequence has a different title than the Maestro entry, # don't try to keep the titles synchronized self.unsynched_seqs.add(seq) # synchronize residue selection the next time the user changes it self._delayed_sync_eids.add(entry_id) self.undo_stack.clear()
def _transferResidueInformation(self, *, from_seq, aligned_seq, to_seq): """ Transfer residue information from one sequence to another using `aligned_seq` as a reference. `aligned_seq` should have the exact same sequence of residues as `to_seq` and be aligned to `from_seq`. Any residue in `aligned_seq` that is aligned to a matching residue in `from_seq` will have its corresponding residue in `to_seq` transformed to match. For example, if we have the following arguments: `from_seq`: ATCG `aligned_seq`:AY~G `to_seq`: AYG The `A` and `G` residues of `to_seq` will have the same residue numbers, insertion codes, and seqres_only value as the `A` and `G` in `from_seq`. :param from_seq: The sequence to transfer residue information from. Should be aligned to `to_seq`. :type from_seq: sequence.Sequence :param to_seq: The sequence to transfer residue information to. Should have the same sequence of residues as `aligned_seq` :type to_seq: sequence.Sequence :param aligned_seq: A copy of `to_seq` that is aligned to `from_seq`. :type aligned_seq: sequence.Sequence """ unaligned_residues = set() ssa = [] for seq_res, copied_res in zip(to_seq.residues(), aligned_seq.residues()): struc_res = from_seq[copied_res.idx_in_seq] if (struc_res.is_gap or copied_res.type.short_code != struc_res.type.short_code): unaligned_residues.add(seq_res) ssa.append(None) else: seq_res.resnum = struc_res.resnum seq_res.inscode = struc_res.inscode seq_res.seqres_only = struc_res.seqres_only ssa.append(struc_res.secondary_structure) for res in unaligned_residues: res.resnum = None res.inscode = None res.seqres_only = True to_seq.setSSA(ssa)
[docs] def getAssociatedChainName(self, seq): """ Get the name of the chain associated with a sequence. Returns None if the `seq` doesn't have a structure. :return: The associated chain name :rtype: str """ if not seq.hasStructure(): return None entry_id = int(seq.entry_id) for chain_name, chain_data in self._entry_chain_map[entry_id].items(): if seq in chain_data.seqs: return chain_name
[docs] def unlinkSequence(self, seq): """ Unlink a sequence from its structure. :param seq: The sequence to unlink. :type seq: sequence.Sequence """ entry_id = int(seq.entry_id) for chain_data in self._entry_chain_map[entry_id].values(): if seq in chain_data.seqs: chain_data.removeSeq(seq) seq.setResidueMap({})
[docs] def disconnect(self): """ Disconnect Maestro callbacks and Workspace Hub signals """ for callback_info, callable in self._getMaestroCallbacks(): callback_info.remove(callable) for signal, slot in self._getWHSignalsAndSlots(self._workspace_hub): signal.disconnect(slot) self._workspace_hub = None for chains in self._entry_chain_map.values(): for chain_data in chains.values(): chain_data.disconnect()
def _getMaestroCallbacks(self): """ Return a list of maestro callback info and corresponding slot :rtype: list(tuple(maestro_callback.CallbackInfo, callable)) """ cbs = maestro_callback.CALLBACK_FUNCTIONS return [ (cbs[maestro_callback.PROJECT_CLOSE_CALLBACK], self._projectClose), (cbs[maestro_callback.PROJECT_UPDATE_CALLBACK], self._projectChanged), (cbs[maestro_callback.WORKSPACE_CHANGED_CALLBACK], self.onWorkspaceChanged), ] # yapf: disable def _getMHSignalsAndSlots(self, mh): """ Return a list of maestro hub signals and corresponding slots :rtype: list(tuple(signal, callable)) """ return [ (mh.projectOpened, self._projectOpened) ] # yapf: disable @QtCore.pyqtSlot() def _projectOpened(self): self.projectLoadRequested.emit() def _getWHSignalsAndSlots(self, wh): """ Return a list of workspace hub signals and corresponding slots :rtype: list(tuple(signal, callable)) """ return [ (wh.ligandAtomsChanged, self._ligandAtomsChanged), (wh.ligandResiduesChanged, self._ligandResiduesChanged), (wh.inclusionChanged, self._inclusionChanged), (wh.residuesUpdated, self._residuesUpdated), (wh.residueDisplayChanged, self._residueDisplayChanged), (wh.residueSelectionChanged, self._residueSelectionChanged) ] # yapf: disable def _initEidsInWorkspaceAln(self, eids): """ Add sequences for the specified entry ids to the workspace alignment. This method should only be called for entries that have never been previously added to the workspace alignment. If an entry has been previously added to the workspace alignment, then instead add _ChainData.workspace_seq to the alignment for all chains in the entry. :param eids: The entry ids to add :type eids: iterable """ # Ignore any scratch entries and make sure that all eids are ints # since that's how WorkspaceHub provides them eids = [ int(eid) for eid in eids if isinstance(eid, int) or eid.isdigit() ] vis_res = self._getVisResIfNeeded(eids) for cur_eid in eids: # _getSeqsForEid will populate self._entry_chain_map if needed seqs = self._getSeqsForEid(cur_eid, vis_res) for cur_seq in seqs: chain_data = self._entry_chain_map[cur_eid][ cur_seq.structure_chain] chain_data.workspace_seq = cur_seq self._workspace_aln.addSeqs(seqs, replace_selection=True) def _getVisResIfNeeded(self, eids): """ If this structure model isn't yet tracking any of the specified entries, return information about what residues are currently visible in the workspace. Otherwise, return None. :param eids: A list of entry ids :type eids: iterable :return: A dictionary of [entry id as integer][chain name] = set of residues currently visible in the workspace, or None :rtype: defaultdict or NoneType """ if set(eids) - set(self._entry_chain_map.keys()): return self._getWorkspaceVisRes()
[docs] def getSeqsForEid(self, eid): """ Get sequences for each chain in the specified entry. :param eid: The entry id to fetch sequences for. :type eid: int or str :return: A list of the requested sequences. :rtype: list """ # Make sure eid is an int since that's how WorkspaceHub provides them eid = int(eid) vis_res = self._getVisResIfNeeded([eid]) return self._getSeqsForEid(eid, vis_res)
[docs] def getSeqsForEids(self, eids, *, ignore_missing=False): """ Get sequences for each chain in all specified entries. :param eids: The entry ids to fetch sequences for. :type eids: list :param ignore_missing: Whether we should ignore any entry ids that aren't present in the project. If this is False and an entry id is not present, a ValueError will be raised. :type ignore_missing: bool :return: A list of the requested sequences. :rtype: list :raise ValueError: If any of the specified eids are not present in the project and `ignore_missing` is `False`. """ # Make sure the eids are ints since that's how WorkspaceHub provides # them eids = list(map(int, eids)) vis_res = self._getVisResIfNeeded(eids) seqs = [] for cur_eid in eids: try: cur_seqs = self._getSeqsForEid(cur_eid, vis_res) except ValueError: if not ignore_missing: raise else: seqs.extend(cur_seqs) return seqs
def _readStructures(self, filename): """ Return sequences for the specified file, which must contain structural data. The sequences will have associated structures accessible via `sequence.getStructure()`. NOTE Two sets of sequences will be created for the structure. One set will be loaded into the workspace tab in response to new structures being loaded into Maestro, and one set will be used as a return value. NOTE If structures with the same title are found using NMR, then only the sequence for the structure with the lowest entry ID will be returned. For example, if the file has six 5z5q entries found using NMR with entry IDs 1-6, and one 1cmy entry not found using NMR, then two sequences will be returned: one sequence for one 5z5q entry (entry ID 1) and one sequence for 1cmy. This is done since files with NMR structures usually have multiple conformers which all have the same sequence. :param filename: The filename to read :type filename: str :return: All sequences :rtype: list(sequence.Sequence) """ rows = self._importStructuresIntoWorkspace(filename) rows = self._filterNMRRows(rows) eids = [row.entry_id for row in rows] return self.getSeqsForEids(eids) def _filterNMRRows(self, rows): """ Given a list of rows, filter out NMR conformers, keeping only one per shared entry title. See the notes in `_readStructures` for an example. :param rows: List of the project rows to filter. Rows should be in entry id order. :type rows: list(project.ProjectRow) :return: Filtered list of project rows :rtype: list(project.ProjectRow) """ nmr_titles = set() filtered = [] for row in rows: if 'NMR' in row.property.get('s_pdb_PDB_EXPDTA', ''): if row.title not in nmr_titles: filtered.append(row) nmr_titles.add(row.title) else: filtered.append(row) return filtered def _importStructuresIntoWorkspace(self, filename): """ Import all structures from the given file into the workspace and include only the first structure. Additionally selects all the new structures. If the structure is missing the title, the file base name is used as title. :param filename: The filename to read :type filename: str :return: list of the new project rows :rtype: list(project.ProjectRow) """ proj = maestro.project_table_get() strucs = structure.StructureReader(filename) rows = [] for st in strucs: if not st.title.strip(): st.title = fileutils.get_basename(filename) row = proj.importStructure(st) rows.append(row) # include the first new entry rows[0].in_workspace = project.IN_WORKSPACE # select all new entries for row in rows: row.is_selected = True return rows
[docs] def importStructuresIntoWorkspace(self, filename): # See AbstractStructureModel for method documentation rows = self._importStructuresIntoWorkspace(filename) eids = set(row.entry_id for row in rows) return [seq for seq in self._workspace_aln if seq.entry_id in eids]
[docs] def getSelectedEntries(self): # See AbstractStructureModel for method documentation proj = maestro.project_table_get() eids = [row.entry_id for row in proj.selected_rows] return self.getSeqsForEids(eids)
[docs] def getIncludedEntries(self): # See AbstractStructureModel for method documentation proj = maestro.project_table_get() eids = [row.entry_id for row in proj.included_rows] return self.getSeqsForEids(eids)
def _getSeqsForEid(self, eid, vis_res): """ Get sequences for each chain in the specified entry. Note that this method will populate `self._entry_chain_map` with data for entry `eid` if the entry is not already present. :param eid: The entry id to fetch sequences for. :type eid: int or str :param vis_res: If information about `eid` is not already stored in `self._entry_chain_map`, a dictionary of [entry id as integer][chain name] = set of residues currently visible in the workspace. If `self._entry_chain_map` already contains information about `eid`, may be None. :type vis_res: defaultdict or NoneType :return: A list of the requested sequences. :rtype: list :raise ValueError: If the specified eid is not present in the project """ eid = int(eid) seqs, row, struc, proj = self._getUnlinkedSeqsForEid(eid) if eid not in self._entry_chain_map: self._entry_chain_map[eid] = self._initDataForEntry( eid, row, struc, seqs, vis_res) self._addSeqsToChainData(seqs, eid, proj) return seqs def _getUnlinkedSeqsForEid(self, eid): """ Get sequences that correspond to the structure for the specified project entry. These sequences will not be monitored by the structure model in any way. :param eid: The entry id to fetch sequences for :type eid: str or int :return: A tuple of - A list of the requested sequences - The ProjectRow for the specified entry id - The structure for the specified entry id - The Maestro project :rtype: tuple(list(sequence.Sequence), project.ProjectRow, structure.Structure, project.Project)) :raises ValueError: If the entry id is not found. """ proj = maestro.project_table_get() try: row = proj[eid] except KeyError: raise ValueError("Entry id %s not found" % eid) struc = row.getStructure() seqs = self._convertStructure(struc) for seq in seqs: # Update seqs with project row name seq.name = row.title return seqs, row, struc, proj def _convertStructure(self, st, *args, **kwargs): # See parent class for method documentation seqs = super()._convertStructure(st, *args, **kwargs) # record any new residue names so that we'll recognize them if the # WorkspaceHub emits signals about them resnames = {res.long_code for seq in seqs for res in seq} resnames -= {"UNK", "", None} self._valid_seqres_names.update(resnames) return seqs def _addSeqsToChainData(self, seqs, eid, proj): """ Set the structure getter and setter on all sequences and add them to the appropriate `_ChainData` object. :param seqs: All sequences to process :type seqs: list[sequence.Sequence] :param eid: The entry id of the sequences :type eid: int :param proj: The Maestro project :type proj: project.Project """ # We can't use row.getStructure here because ProjectRow objects are # based on entry index, which can become stale when the project # changes. Instead, we create a lambda that fetches the structure # based on entry id, which never stales. get_struc = lambda: proj[eid].getStructure() set_struc = lambda struc: proj[eid].setStructure(struc) for cur_seq in seqs: self._entry_chain_map[eid][cur_seq.structure_chain].addSeq(cur_seq) cur_seq._get_structure = get_struc cur_seq._set_structure = set_struc def _getWorkspaceVisRes(self): """ Determine which residues are currently in the workspace. :return: A dictionary of [entry id as integer][chain name] = set of residues currently visible in the workspace. :rtype: defaultdict """ vis_residues = self._workspace_hub.getDispAtomsToResSet() vis_res_by_chain = defaultdict(lambda: defaultdict(set)) for whres in vis_residues: if not self._isSeqRes(whres): continue vis_res_by_chain[whres.getEntryID()][whres.getChain()].add( self._getKeyFromWHResidue(whres).chainKey()) return vis_res_by_chain def _initDataForEntry(self, eid, row, struc, seqs, vis_res): """ Create `_ChainData` objects for all chains in the specified entry. :param eid: The entry id to generate `_ChainData` objects for. :type eid: int :param row: The project table row for the specified entry. :type row: `project.ProjectRow` :param struc: The structure for the specified entry. :type struc: `structure.Structure` :param seqs: A list of all sequences for the specified entry. :type seqs: list :param vis_res: A dictionary of [entry id as integer][chain name] = set of residues currently visible in the workspace. :type vis_res: defaultdict :return: A dictionary of {chain name: `_ChainData` object} :rtype: dict """ included = row.in_workspace != project.NOT_IN_WORKSPACE residues = { maestro_ui.WHResidue(struc.handle, i) for i in range(1, struc.atom_total + 1) } res_by_chain = defaultdict(set) for whres in residues: if not self._isSeqRes(whres): continue res_by_chain[whres.getChain()].add( self._getKeyFromWHResidue(whres).chainKey()) data = _EntryData(seqres.has_seqres(struc)) for cur_seq in seqs: chain = cur_seq.structure_chain chain_data = self._createChainData(eid, chain, res_by_chain[chain], vis_res[eid][chain], included) data[chain] = chain_data return data def _createChainData(self, eid, chain, all_res, vis_res, included): """ Create a new `_ChainData` object and connect all required signals. See `_ChainData.__init__` for argument documentation :return: The newly created `_ChainData` object. :rtype: _ChainData """ chain_data = _ChainData(eid, chain, all_res, vis_res, included) chain_data.wsVisibilityChangeRequested.connect( self._setWorkspaceVisibility) return chain_data def _isSeqRes(self, whres): """ Determine if the specified residue should be included in a sequence. Only protein and nucleic acid residues are included. Solvents, ions, ligands, and others are excluded. :param whres: The residue to include or exclude :type whres: maestro_ui.WHResidue :return: True if the residue should be included. False otherwise. :rtype: bool """ return whres.getPDBName() in self._valid_seqres_names
[docs] def getWorkspaceAlignment(self): # See AbstractStructureModel for method documentation return self._split_workspace_aln
def _getLinkedSequences(self, aln): """ Get sequences from the alignment that are linked to a Maestro entry. There is no guarantee that the entry is still present in the Maestro project, nor that the entry still contains the corresponding chain. :param aln: Alignment :type aln: schrodinger.protein.alignment.BaseAlignment :return: Existing sequences that have corresponding entries :rtype: list(sequence.Sequence) """ # TODO MSV-1982 mapping may need to include project name seqs = [] for seq in aln: try: int(seq.entry_id) except (ValueError, TypeError): continue seqs.append(seq) return seqs def _projectClose(self): """ Respond to the project closing by clearing the workspace alignment and clearing all sequence data stored in this class. """ self._closing_project = True self.projectSaveRequested.emit(True) self._workspace_aln.clear() self._resetEntryChainMap() # The undo stack will be cleared by the panel when it resets
[docs] def getMsvAutosaveProjectName(self): """ Get the filepath where projects should be autosaved to and autoloaded from. :rtype: str """ pt = maestro.project_table_get() maestro.project_table_synchronize() project_path = pt.getAdditionalDataDir() return os.path.join(project_path, 'project.msv2')
def _resetEntryChainMap(self): for chains in self._entry_chain_map.values(): for chain_data in chains.values(): chain_data.chainRemoved() self._entry_chain_map.clear() self._delayed_sync_eids.clear() @util.skip_if("_renaming_project_entries") @util.skip_if("_changing_maestro_visibility") @util.skip_if("_updating_seqres") def _projectChanged(self): """ If an entry was just removed from the project, stop monitoring it. """ try: proj = maestro.project_table_get() except project.ProjectException: # The project is currently closed return self._closing_project = False for cur_eid, chains in list(self._entry_chain_map.items()): if cur_eid not in proj: for chain_data in chains.values(): if chain_data.visibility != Inclusion.Excluded: ws_aln = self._workspace_aln ws_seq = chain_data.workspace_seq if ws_seq == ws_aln.getReferenceSeq(): ws_aln.clearAnchors() ws_aln.removeSeq(ws_seq) chain_data.chainRemoved() del self._entry_chain_map[cur_eid] self._delayed_sync_eids.discard(cur_eid) self._checkProjectTableForRenames() def _checkProjectTableForRenames(self): """ Check whether Project Table entries linked to chains have been renamed Note: Assumes the caller has already verified the presence of the Project Table and the presence of current stored entry IDs init. """ pt = maestro.project_table_get() ws_aln = self.getWorkspaceAlignment() new_name_seqs = {} new_name_ws_seqs = {} for eid, chains in self._entry_chain_map.items(): row = pt[eid] for chain in chains.values(): ws_seq = chain.workspace_seq # Don't need to rename ws seqs that aren't in the ws aln # (e.g. linked but excluded) if ws_seq is not None and ws_seq in ws_aln: if row.title != ws_seq.name: new_name_ws_seqs[ws_seq] = row.title for seq in chain.seqs: if seq in self.unsynched_seqs: continue elif self._name_synch_aln is not None and seq not in self._name_synch_aln: continue if row.title != seq.name: new_name_seqs[seq] = row.title with self._renamingProjectEntries(): if new_name_ws_seqs: for seq, new_name in new_name_ws_seqs.items(): self.renameSeq(seq, new_name) if new_name_seqs: self.seqProjectTitlesChanged.emit( new_name_seqs, self._request_immediate_rename) self._request_immediate_rename = False self._name_synch_aln = None
[docs] def getLinkedAlnSeqs(self, seq): """ Return a set of sequences linked to the same entry ID :param seq: Split-chain sequence to get a linked sequence set for :type seq: sequence.ProteinSequence :return: Set of all sequences in the alignment with the same name linked to the entry ID. :rtype: set(sequence.ProteinSequence) """ if seq in self.unsynched_seqs or seq.entry_id is None or seq.entry_id == '': return set() aln = self._gui_model.getAlignmentOfSequence(seq) linked_seqs = set([seq]) if aln is not None: for other_seq in aln: if other_seq in self.unsynched_seqs: continue if other_seq.entry_id == seq.entry_id and other_seq.name == seq.name: linked_seqs.add(other_seq) else: ws_aln = self.getWorkspaceAlignment() if seq in ws_aln: for other_seq in ws_aln: if other_seq.entry_id == seq.entry_id: linked_seqs.add(other_seq) return linked_seqs
[docs] def unsynchEntryID(self, eid): """ Unsynch all non-Workspace sequences for a specified entry ID from the Workspace. :param eid: Entry ID to unsynchronize. :def eid: int """ for chains in self._entry_chain_map.get(eid, ()): for chain in chains.values(): self.unsynched_seqs.update(chain.seqs)
[docs] def renameSeq(self, seq, new_name, rename_linked_seqs=False, rename_entry=False): """ Rename the specified sequence. :param seq: Sequence to rename :type seq: sequence.ProteinSequence :param new_name: New name for the sequence :type new_name: str :param rename_linked_seqs: Whether to rename linked sequences from the same alignment. Will be ignored if the seq is from the Workspace alignment. :type rename_linked_seqs: bool :param rename_entry: Whether to rename the linked Project entry. Will be ignored if the seq is from the Workspace alignment. :type rename_entry: bool """ ws_aln = self.getWorkspaceAlignment() if seq in ws_aln: ws_aln.renameSeq(seq, new_name) self.renameProjectEntry(seq.entry_id, new_name, ws_aln) else: aln = self._gui_model.getAlignmentOfSequence(seq) seqs_to_rename = set([seq]) if rename_linked_seqs: for other_seq in aln: if other_seq.entry_id == seq.entry_id and other_seq.name == seq.name: seqs_to_rename.add(other_seq) for rename_seq in seqs_to_rename: aln.renameSeq(rename_seq, new_name) if not rename_entry: self.unsynched_seqs.update(seqs_to_rename) else: self.renameProjectEntry(seq.entry_id, new_name, aln)
[docs] @util.skip_if('_renaming_project_entries') def renameProjectEntry(self, eid, new_title, aln=None): """ Rename the specified Project Table entry. If an alignment is specified, find other sequences related to this entry and alignment and request a rename for them as well. """ self._name_synch_aln = aln self._request_immediate_rename = True pt = maestro.project_table_get() row = pt[eid] row.title = new_title pt.update()
@QtCore.pyqtSlot(int, str, bool, bool) def _setWorkspaceVisibility(self, eid, chain, visible, already_included): """ Show or hide the specified chain in the workspace. If showing a chain that's not currently included in the workspace, then the entry will be included and all other chains will be hidden. :param eid: The entry to show or hide. :type eid: int :param chain: The chain to show or hide. :type chain: str :param visible: Whether the chain should be shown (True) or hidden (False). :type visible: bool :param already_included: Whether the entry is already included in the workspace or not. :type already_included: bool """ if already_included: struc = maestro.workspace_get() atom_nums = self._getWorkspaceAtoms(struc, eid, chain) with self._changingMaestroVisibility(): if visible: self._workspace_hub.displayAtomsAdd(atom_nums) else: self._workspace_hub.displayAtomsRemove(atom_nums) elif visible: proj = maestro.project_table_get() with self._changingMaestroVisibility(): proj[eid].in_workspace = project.IN_WORKSPACE # make sure we fetch the workspace structure after including the # entry, not before struc = maestro.workspace_get() atom_nums_to_show = self._getWorkspaceAtoms(struc, eid, chain, True) atom_nums_to_hide = self._getWorkspaceAtoms(struc, eid, chain, False) with self._changingMaestroVisibility(): self._workspace_hub.displayAtomsRemove(atom_nums_to_hide) self._workspace_hub.displayAtomsAdd(atom_nums_to_show) # Update inclusion for other chains in this entry entry_data = self._entry_chain_map[eid] for chain_name, chain_data in entry_data.items(): if chain_name != chain: chain_data.clearVisRes() chain_data.included = True if chain_data.workspace_seq is None: msg = f"{eid}{chain_name} has no workspace seq" raise ValueError(msg) self._workspace_aln.addSeq(chain_data.workspace_seq) else: raise RuntimeError("Trying to hide a chain that isn't in the " "workspace.") def _getWorkspaceAtoms(self, struc, eid, chain, want_chain=True): """ Get all workspace atom numbers that either - belong to the specified chain - belong to anything other than the specified chain :param struc: The workspace structure. :type struc: `structure.Structure` :param eid: The entry id of the specified chain. :type eid: int :param chain: The specified chain. :type chain: str :param want_chain: Whether to return all atoms in the specified chain (True) or all atoms not in the specified chain (False) :type want_chain: bool :return: A list of atom numbers. :rtype: list """ negation = "" if want_chain else "not " asl = ('entry.id %s and %schain.name "%s" and (protein or ' 'nucleic_acids)' % (eid, negation, chain)) return analyze.evaluate_asl(struc, asl) # @QtCore.pyqtSlot("QList<int>", "QList<int>") @util.skip_if("_closing_project") @util.skip_if("_changing_maestro_visibility") def _inclusionChanged(self, included, excluded): """ Update the workspace alignment when entry inclusion changes. :param included: A list of all entry ids that were just included. :type included: list[int] :param excluded: A list of entry ids that were just excluded. :type excluded: list[int] """ # We don't synchronize residue selection for newly included entries # until the user changes residue selection in either the workspace or # the MSV. _delayed_sync_eids keeps track of entries that need their # residue selection synchronized when that happens. self._delayed_sync_eids.update(included) self._delayed_sync_eids.difference_update(excluded) self._delayed_sync_eids.discard(SCRATCH_ENTRY_ID) # Remember these entries so we know to ignore them in _residuesChanged. self._inclusion_changing = set(included + excluded) self._setEntryInclusion(included, True) self._setEntryInclusion(excluded, False) # TODO: allow inclusion changes to be undone from the MSV (MSV-2192) self.undo_stack.clear() def _setEntryInclusion(self, eids, included): """ Update the workspace alignment when entry inclusion changes. :param eids: A list of entry ids that were either included or excluded. :type eids: list[int] :param included: Whether the entries were included (True) or excluded (False). :type included: bool """ eids_to_init, seqs_to_add, seqs_to_remove = \ self._parseEntryInclusion(eids, included) ws_aln = self._workspace_aln ref_seq = ws_aln.getReferenceSeq() if ref_seq is None: ref_seq_eid = None else: ref_seq_eid = int(ref_seq.entry_id) if not included and ref_seq_eid in eids: ws_aln.clearAnchors() if seqs_to_add: ws_aln.addSeqs(seqs_to_add) if seqs_to_remove: if not self._gui_model.getWorkspacePage().split_chain_view: # get the combined-chain sequences to remove (since # seqs_to_remove currently contains split-chain sequences) seqs_to_remove = [ seq for seq in ws_aln if int(seq.entry_id) in eids ] with self._syncingInclusion(): ws_aln.removeSeqs(seqs_to_remove) if eids_to_init: self._initEidsInWorkspaceAln(sorted(eids_to_init)) def _parseEntryInclusion(self, eids, included): """ Figure out what changes need to be made in the workspace alignment when entry inclusion changes. Note that this method does not make any changes in the alignment. See `_setEntryInclusion` for that. :param eids: A list of entry ids that were either included or excluded. :type eids: list(int) :param included: Whether the entries were included (True) or excluded (False). :type included: bool :return: A tuple of: - The entry ids of structures that need to be initialized for inclusion in the workspace alignment. - A list of split-chain sequences to add to the workspace alignment. - A list of split-chain sequences to remove from the workspace alignment. :rtype: tuple(set(int), list(sequence.ProteinSequence), list(sequence.ProteinSequence)) """ eids_to_init = set() seqs_to_add = list() seqs_to_remove = list() for cur_eid in eids: if cur_eid <= 0: # ignore scratch entries pass elif cur_eid in self._entry_chain_map: for cname, chain_data in self._entry_chain_map[cur_eid].items(): chain_data.included = included if chain_data.workspace_seq is None: eids_to_init.add(cur_eid) continue ws_seq = chain_data.workspace_seq if included: seqs_to_add.append(ws_seq) else: seqs_to_remove.append(ws_seq) elif included: eids_to_init.add(cur_eid) else: # trying to exclude an eid not found in self._entry_chain_map # We don't have to do anything since the chain to exclude has # already been removed. This can happen after Maestro undoes # the importing of an entry into the workspace. pass # Deliberately left here to record intention. return eids_to_init, seqs_to_add, seqs_to_remove def _getKeyFromWHResidue(self, whres): """ Turn a workspace hub residue object into a key that can uniquely identify it in the workspace. This key is used to help map between residues in the MSV workspace alignment and residues in the maestro workspace. :param whres: the residue to turn into a key :type whres: maestro_ui.WHResidue :returns: a unique key representing the residue :rtype: residue.ResidueKey """ # TODO MSV-2379: Consider all items in WHResidue.d_hash return residue.ResidueKey(int(whres.getEntryID()), whres.getChain(), whres.getResNum(), whres.getInsCode()) def _getKeyFromStructureResidue(self, structure_res): first_atom = next(iter(structure_res.atom)) eid = first_atom.entry_id ch = structure_res.chain resnum = structure_res.resnum inscode = structure_res.inscode return residue.ResidueKey(int(eid), ch, resnum, inscode) # @QtCore.pyqtSlot("QSet<WHResidue>", "QSet<WHResidue>", # "QHash<WHResidue,QSet<WHResidue> >") @util.skip_if("_closing_project") @util.skip_if("_changing_maestro_visibility") @util.skip_if("_updating_seqres") @util.skip_if("_updating_color") def _residuesUpdated(self, removed, added, updated): """ Update sequences in response to any workspace residue additions, removals, or mutations. :param removed: Residues that were removed from the workspace structure. :type removed: set[maestro_ui.WHResidue] :param added: Residues that were added to the workspace structure. :type added: set[maestro_ui.WHResidue] :param updated: Residues that were modified in the workspace structure, given as a dictionary of {old residue: set of new residues}. :type updated: dict(maestro_ui.WHResidue, set(maestro_ui.WHResidue)) """ # Changing inclusion triggers a residuesChanged signal, so we filter out # any entry ids that have been included or excluded. We also ignore -1, # which is the scratch entry id. eids_to_ignore = self._inclusion_changing | {SCRATCH_ENTRY_ID} self._inclusion_changing.clear() (new_by_chain, deleted_by_chain, mutated_by_chain, added_chains, chain_renames, modified_eids) = \ self._parseUpdatedResidues(added, removed, updated, eids_to_ignore) self._createNewChains(added_chains) self._deleteRemovedResidues(deleted_by_chain) self._mutateResidues(mutated_by_chain) self._insertNewResidues(new_by_chain) self._deleteEmptyChains(deleted_by_chain) self._renameChains(chain_renames) self._updateSeqres(modified_eids) if modified_eids: self.undo_stack.clear() def _parseUpdatedResidues(self, added, removed, updated, eids_to_ignore): """ Create lists of new, removed, and mutated residues by chain. :param added: Residues that were added to the workspace structure. :type added: list[maestro_ui.WHResidue] :param removed: Residues that were removed from the workspace structure. :type removed: list[maestro_ui.WHResidue] :param updated: Residues that were modified in the workspace structure, given as a dictionary of {old residue: set of new residues}. :type updated: dict(maestro_ui.WHResidue, set(maestro_ui.WHResidue)) :param eids_to_ignore: A set of entry ids that we should exclude from the return values. Used for entries that have been included or excluded from the workspace or for scratch entries. :type eids_to_ignore: set(int) :return: A tuple of: - New residues for existing sequences, reported as {(entry id, chain): a set of NewResInfo objects} - Deleted residues, reported as {(entry id, chain): a dictionary of {(residue number, insertion code): residue name}} - Mutated residues, reported as {(entry id, chain): a dictionary of {(residue number, insertion code): new residue name}} - New residues for new chains, reported as {(entry id, chain): a set of NewResInfo objects} - Chain renames, reported as a list of (entry id, old chain name, new chain name) tuples - Entry ids of all modified entries :rtype: tuple(defaultdict, defaultdict, defaultdict, defaultdict, list, set) """ modified_eids = set() (added_from_updates, removed_from_updates, mutated_by_chain, chain_renames) = self._parseModifiedResidues(updated, eids_to_ignore, modified_eids) added_res = self._whresSet(added, eids_to_ignore) added_res.update(added_from_updates) new_by_chain, added_chains = self._parseAddedResidues( added_res, modified_eids) removed_res = self._whresSet(removed, eids_to_ignore) removed_res.update(removed_from_updates) deleted_by_chain = self._parseRemovedResidues(removed_res, modified_eids) return (new_by_chain, deleted_by_chain, mutated_by_chain, added_chains, chain_renames, modified_eids) def _parseModifiedResidues(self, updated, eids_to_ignore, modified_eids): """ Parse the updated residues reported by the WorkspaceHub's residuesUpdated signal. :param updated: The updated residues. :type updated: dict(maestro_ui.WHResidue, set(maestro_ui.WHResidue)) :param eids_to_ignore: A set of entry ids that we should exclude from the return values. :type eids_to_ignore: set(int) :param modified_eids: A set of entry ids for modified residues. Will be updated with the entry ids of any updated residues. :type modified_eids: set(int) :return: A tuple of - Residues that should be handled as new residues. - Residues that should be handled as deleted residues. - Residues that have been mutated (The residue type changed, but residue number, insertion code, etc remained the same). Given as {(entry id, chain): a dictionary of {(residue number, insertion code): new residue name}} - Chains that have been renamed. Given as a list of (entry id, old chain name, new chain name) tuples. :rtype: tuple(set(WHResInfo), set(WHResInfo), defaultdict(tuple(int, str), defaultdict(tuple(int, str), str)), list(tuple(int, str, str))) """ res_to_remove = set() res_to_add = set() mutated_by_chain, chain_renamed_res = \ self._parseMutationsAndPotentialChainRenames( updated, eids_to_ignore, modified_eids, res_to_remove, res_to_add) chain_renames = self._parseChainRenames(chain_renamed_res, res_to_remove, res_to_add) return (res_to_add, res_to_remove, mutated_by_chain, chain_renames) def _parseMutationsAndPotentialChainRenames(self, updated, eids_to_ignore, modified_eids, res_to_remove, res_to_add): """ Parse the updated residues reported by the WorkspaceHub's residuesUpdated signal for mutated residues and residues where only the chain name has changed. :param updated: The updated residues. :type updated: dict(maestro_ui.WHResidue, set(maestro_ui.WHResidue)) :param eids_to_ignore: A set of entry idds that we should exclude from the return values. :type eids_to_ignore: set(int) :param modified_eids: A set of entry ids for modified residues. Will be updated with the entry ids of any updated residues. :type modified_eids: set(int) :param res_to_remove: A set of residues that should be handled as deleted residues. Will be updated based on the contents of `updated`. :type res_to_remove: set(WHResInfo) :param res_to_add: A set of residues that should be handled as new residues. Will be updated based on the contents of `updated`. :type res_to_add: set(WHResInfo) :return: A tuple of: - Residues that have been mutated (The residue type changed, but residue number, insertion code, etc remained the same). Given as {(entry id, chain): a dictionary of {(residue number, insertion code): new residue name}} - Residues where only the chain name has been changed and the new chain name didn't previously exist in the entry. Given as nested dictionaries of chain_renamed_res[entry_id][old_chain_name][new_chain_name] = list of (old_WHResInfo, new_WHResInfo) tuples :rtype: tuple( defaultdict(tuple(int, str), dict(tuple(int, str), str)), defaultdict(int, defaultdict(str, defaultdict( str, list[tuple(WHResInfo, WHResInfo)])))) """ mutated_by_chain = defaultdict(dict) chain_renamed_res = defaultdict( partial(defaultdict, partial(defaultdict, list))) for old_res, cur_updated in updated.items(): eid = old_res.getEntryID() if eid in eids_to_ignore or not self._isSeqRes(old_res): continue modified_eids.add(eid) old_res = WHResInfo.fromWHRes(old_res) cur_updated = self._whresSet(cur_updated, eids_to_ignore) if len(cur_updated) == 1: new_res = next(iter(cur_updated)) eid_and_resnum_match = (old_res.eid, old_res.resnum, old_res.inscode) == (new_res.eid, new_res.resnum, new_res.inscode) resname_match = (old_res.resname == new_res.resname) chain_match = (old_res.chain == new_res.chain) if (eid_and_resnum_match and resname_match and not chain_match and new_res.chain not in self._entry_chain_map[eid]): # is a potential chain rename eid = old_res.eid chain_renamed_res[eid][old_res.chain][new_res.chain].append( (old_res, new_res)) continue elif eid_and_resnum_match and chain_match and not resname_match: # is a mutation entry_key = old_res.entryKey() chain_key = old_res.chainKey() mutated_by_chain[entry_key][chain_key] = new_res.resname continue # is neither a mutation nor a potential chain rename if old_res in cur_updated: # only some of the atoms in the residue got updated, so the # residue itself still exists cur_updated.remove(old_res) else: res_to_remove.add(old_res) res_to_add.update(cur_updated) return mutated_by_chain, chain_renamed_res def _parseChainRenames(self, chain_renamed_res, res_to_remove, res_to_add): """ Given residues that have had their chain name changed, find chains where all structured residues have been moved to a single new chain (i.e. the whole chain has been renamed). :param chain_renamed_res: Residues where only the chain name has changed and there's no existing sequence for the new chain. Given as nested dictionaries of chain_renamed_res[entry_id][old_chain_name][new_chain_name] = list of (old_WHResInfo, new_WHResInfo) tuples :type chain_renamed_res: dict(int, dict(str, dict( str, list[tuple(WHResInfo, WHResInfo)])))) :param res_to_remove: A set of residues that should be handled as deleted residues. Will be updated based on the contents of `updated`. :type res_to_remove: set(WHResInfo) :param res_to_add: A set of residues that should be handled as new residues. Will be updated based on the contents of `updated`. :type res_to_add: set(WHResInfo) :return: Chains that have been renamed. Given as a list of (entry id, old chain name, new chain name) tuples. :rtype: list(tuple(int, str, str)) """ chain_renames = [] for eid, renamed_chains in chain_renamed_res.items(): for old_chain_name, new_chains in renamed_chains.items(): if len(new_chains) == 1: new_chain_name, new_chain = next(iter(new_chains.items())) seq = \ self._entry_chain_map[eid][old_chain_name].workspace_seq if len(new_chain) == seq.structuredResidueCount(): # All structured residues in the old chain were moved to # the same new chain, so this is a chain rename chain_renames.append( (eid, old_chain_name, new_chain_name)) continue # This isn't a chain rename, so we instead want to manually # remove residues from the old chain and create a new chain with # new residue objects for new_chain_name, updated_res in new_chains.items(): for old_res, new_res in updated_res: res_to_remove.add(old_res) res_to_add.add(new_res) return chain_renames def _parseAddedResidues(self, added_res, modified_eids): """ Parse the list of added residues to determine which residues have been added to existing chains and which have been added to new chains. :param added_res: Residues that were added to the workspace structure. :type added_res: set(WHResInfo) :param modified_eids: A set of entry ids for modified residues. Will be updated with the entry ids of any added residues. :type modified_eids: set(int) :return: A tuple of: - New residues for existing sequences, reported as {(entry id, chain): a set of NewResInfo objects} - New residues for new chains, reported as {(entry id, chain): a set of NewResInfo objects} :rtype: tuple(defaultdict(tuple(int, str), set(NewResInfo)), defaultdict(tuple(int, str), set(NewResInfo))) """ new_by_chain = defaultdict(set) added_chains = defaultdict(set) for res_info in added_res: entry_key = res_info.entryKey() chain_key = res_info.chainKey() resname = res_info.resname modified_eids.add(res_info.eid) if res_info.chain in self._entry_chain_map[res_info.eid]: new_by_chain[entry_key].add(NewResInfo(*chain_key, resname)) else: added_chains[entry_key].add( NewResInfo(*chain_key, resname, res_info.is_na)) return new_by_chain, added_chains def _parseRemovedResidues(self, removed_res, modified_eids): """ Convert a list of removed residues to a dictionary organized by entry and chain information. :param removed_res: Residues that were added to the workspace structure. :type removed_res: set(WHResInfo) :param modified_eids: A set of entry ids for modified residues. Will be updated with the entry ids of any added residues. :type modified_eids: set(int) :return: Deleted residues, reported as {(entry id, chain): a dictionary of {(residue number, insertion code): residue name}} :rtype: defaultdict(tuple(int, str), dict(tuple(int, str), str)) """ deleted_by_chain = defaultdict(dict) for res_info in removed_res: entry_key = res_info.entryKey() chain_key = res_info.chainKey() deleted_by_chain[entry_key][chain_key] = res_info.resname modified_eids.add(res_info.eid) return deleted_by_chain def _whresSet(self, residues, eids_to_ignore): """ Convert a list of residues into a set of tuples that describe the residues. Note that WHResidue objects are hashable, but equality is defined using identity, so subtracting two sets of WHResidues directly doesn't give the expected results. The sets returned by this method avoid that problem. :param residues: The list of residues to convert :type residues: list[maestro_ui.WHResidue] :param eids_to_ignore: A set of entry ids that we should exclude from the return values. Used for entries that have been included or excluded from the workspace or for scratch entries. :type eids_to_ignore: set(int) :return: A set of (entry id, chain, residue number, insertion code, residue name, and whether the res is protein) :rtype: set(WHResInfo) """ res_set = set() for whres in residues: eid = whres.getEntryID() if eid not in eids_to_ignore and self._isSeqRes(whres): res_data = WHResInfo.fromWHRes(whres) res_set.add(res_data) return res_set def _createNewChains(self, added_chains): """ Create new sequences and add them to the workspace alignment for all chains that were added to the workspace structure. :param added_chains: A dictionary of all residues to add, given as {(entry id, chain): a set of NewResInfo objects} :type added_chains: dict """ for (eid, chain), res_info in added_chains.items(): all_res = {res.chainKey() for res in res_info} vis_res = self._getWorkspaceVisRes() chain_data = self._createChainData(eid, chain, all_res, vis_res[eid][chain], True) self._entry_chain_map[eid][chain] = chain_data seq = self._createSeqForNewChain(res_info, eid, chain) self._addSeqsToChainData([seq], eid, maestro.project_table_get()) chain_data.workspace_seq = seq self._workspace_aln.addSeq(seq) def _createSeqForNewChain(self, res_info, eid, chain): """ Create a sequence object describing a chain that was just added to the workspace structure. :param res_info: The new residues to add to the sequence :type res_info: set(NewResInfo) :param eid: The entry id of the new chain :type eid: int :param chain: The chain name of the new chain :type chain: str :return: The newly created sequence :rtype: sequence.Sequence :note: This method orders new sequences based on residue number and insertion code, not connectivity. That is identical to the behavior of `seqio.StructureConverter._extractChains`. If that method ever changes, this method should be updated as well to preserve the consistency. """ # decide if chain is nucleic acid or protein res_names = {info.resname for info in res_info} SeqClass = sequence.guess_seq_type(res_names) new_seqres = [] for cur_res_info in sorted(res_info): res = SeqClass.makeSeqElement(cur_res_info.resname) res.resnum = cur_res_info.resnum res.inscode = cur_res_info.inscode new_seqres.append(res) # we pull name, long_name, PDB ID, etc from an arbitrary sequence from the # same entry chain_data = next(iter(self._entry_chain_map[eid].values())) other_seq = chain_data.workspace_seq return SeqClass(new_seqres, name=other_seq.name, chain=chain, structure_chain=chain, long_name=other_seq.long_name, entry_id=eid, entry_name=other_seq.entry_name, pdb_id=other_seq.pdb_id, origin=SeqClass.ORIGIN.Maestro) def _deleteRemovedResidues(self, deleted_by_chain): """ Delete all sequence residues that were removed from the workspace structure. :param deleted_by_chain: A dictionary of all residues to remove, given as {(entry id, chain): a dictionary of {(residue number, insertion code): residue name}} :type deleted_by_chain: dict """ for (eid, chain), deleted_res in deleted_by_chain.items(): chain_data = self._entry_chain_map[eid][chain] for seq in chain_data.seqs: seq_res_to_remove = [] for seq_res in seq: if seq_res.is_gap or not seq_res.hasSetResNum(): continue res_key = seq_res.getChainKey() deleted_code = deleted_res.get(res_key) if deleted_code is not None: if seq_res.long_code != deleted_code: res_info = "".join(map(str, res_key)) warnings.warn( seqio.SequenceWarning( f'Sequence residue {res_info} is ' f'{seq_res.long_code}, expected {deleted_code}' )) seq_res_to_remove.append(seq_res) # If the sequence belongs to an alignment, remove the residues # through the alignment api. seq_page_info = self._gui_model.getPageInfoForSequence(seq) if seq_page_info is None: seq.removeElements(seq_res_to_remove) else: aln = seq_page_info.aln if not seq_page_info.split_chain_view: seq_res_to_remove = list( map(aln.combinedResForSplitRes, seq_res_to_remove)) with aln.modifyingStructure(): with self._notifyMaestroIfAnchoredRemoved( aln, 'removed'): aln.removeAnchors(seq_res_to_remove) aln.removeElements(seq_res_to_remove) @contextlib.contextmanager def _notifyMaestroIfAnchoredRemoved(self, aln, why): maestro_hub = maestro_ui.MaestroHub.instance() # Lambda slots with references to QObjects may cause problems with # garbage collection. To avoid this, we replace maestro hub with a # weakref. maestro_hub = weakref.proxy(maestro_hub) slot = lambda: maestro_hub.emitAddBanner( f'Anchors were removed in MSV to adjust for {why} residues.', '', '', '') aln.signals.anchoredResiduesChanged.connect(slot) yield aln.signals.anchoredResiduesChanged.disconnect(slot) def _deleteEmptyChains(self, deleted_by_chain): """ If any chains are now empty, remove them from all alignments. :param deleted_by_chain: A dictionary of all residues that have been removed, given as {(entry id, chain): a dictionary of {(residue number, insertion code): residue name}}. Note that this method only pays attention to the keys of this dictionary, not the values. :type deleted_by_chain: dict """ for (eid, chain) in deleted_by_chain.keys(): chain_data = self._entry_chain_map[eid][chain] if chain_data.workspace_seq.hasStructuredResidues(): continue for seq in chain_data.seqs: seq_page_info = self._gui_model.getPageInfoForSequence(seq) if seq_page_info is not None: if seq_page_info.split_chain_view: seq_page_info.aln.removeSeq(seq) elif not seq_page_info.seq.hasStructuredResidues(): # The entire combined-chain sequence is empty, so we # remove it. This will also remove all the chains from # the split-chain alignment. seq_page_info.aln.removeSeq(seq_page_info.seq) else: # The chain is empty, but there are still residues in # other chains of the combined-chain sequence. if len(seq): # First remove any remaining gaps. If there are # downstream anchors, this will make sure that new # gaps get added to make up for the ones we're # removing. aln = seq_page_info.aln gaps_to_remove = list( map(aln.combinedResForSplitRes, seq)) seq_page_info.aln.removeElements(gaps_to_remove) # remove the chain from the split-chain alignment. seq_page_info.split_aln.removeSeq(seq) # Remove the chain from the combined-chain sequence. # This is done in a non-undoable manner, but we can't # undo this operation anyway because it involves # Maestro. We don't have to worry about anchoring here # since we know that the sequence is empty. seq_page_info.seq.removeChain(seq) chain_data.chainRemoved() del self._entry_chain_map[eid][chain] def _mutateResidues(self, mutated_by_chain): """ Mutate all sequence residues that were mutated in workspace structure :param mutated_by_chain: A dictionary of residues to mutate, given as {(entry id, chain): a dictionary of {(residue number, insertion code): new residue name}} :type mutated_by_chain: dict """ for (eid, chain), mutated_res in mutated_by_chain.items(): chain_data = self._entry_chain_map[eid][chain] for seq in chain_data.seqs: old_seq_res = set( res for res in seq if (not res.is_gap and res.getChainKey() in mutated_res)) seq_page_info = self._gui_model.getPageInfoForSequence(seq) if seq_page_info is not None: aln = seq_page_info.aln if seq_page_info.split_chain_view: to_unanchor = old_seq_res else: to_unanchor = set( map(aln.combinedResForSplitRes, old_seq_res)) with self._notifyMaestroIfAnchoredRemoved(aln, 'mutated'): aln.removeAnchors(to_unanchor) for res in old_seq_res: new_resname = mutated_res[res.getChainKey()] self._mutateRes(seq, res.idx_in_seq, res, new_resname) def _mutateRes(self, seq, index, cur_res, resname): """ Mutate the specified sequence residue :param seq: The sequence containing the residue to mutate :type seq: sequence.ProteinSequence :param index: The index of the residue to mutate :type index: int :param cur_res: The sequence residue object for the residue to mutate :type cur_res: residue.Residue :param resname: The residue name to mutate to :type resname: str """ mutated_seq_res = seq.makeSeqElement(resname) mutated_seq_res.resnum = cur_res.resnum mutated_seq_res.inscode = cur_res.inscode seq_page_info = self._gui_model.getPageInfoForSequence(seq) if seq_page_info is None: seq.mutate(index, index + 1, mutated_seq_res) else: seq_idx = seq_page_info.aln.index(seq_page_info.seq) if not seq_page_info.split_chain_view: index += seq_page_info.chain_offset cur_res = seq_page_info.seq[index] with seq_page_info.aln.modifyingStructure(): seq_page_info.aln.mutateResidues(seq_idx, index, index + 1, [mutated_seq_res]) def _insertNewResidues(self, new_by_chain): """ Insert sequence residues (or convert structureless sequence residues to structured) for all new residues in the workspace structure. :param new_by_chain: A dictionary of residues to insert, given as {(entry id, chain): a set of NewResInfo objects} :type new_by_chain: dict :note: This method assumes that sequences are ordered based on residue number and insertion code, not connectivity. That is currently the case for all sequences with structures due to the implementation of `seqio.StructureConverter._extractChains`. If that method ever changes, this method must be updated as well. """ for (eid, chain), new_residues_orig in new_by_chain.items(): new_residues_orig = sorted(new_residues_orig) chain_data = self._entry_chain_map[eid][chain] for seq in chain_data.seqs: new_residues = new_residues_orig.copy() # iterate backwards through the sequence and search for the # first residue that matches or is before the last item on our # new_residues list for seq_i, seq_res in reversed(list(enumerate(seq))): if seq_res.is_gap: continue if new_residues: new_res_info = new_residues[-1].chainKey() else: # we've inserted all of the new residues into this # sequence break seq_res_info = seq_res.getChainKey() if seq_res_info == new_res_info: if seq_res.seqres_only: # we're converting a structureless residue to # structured resname = new_residues[-1].resname if resname == seq_res.long_code: seq_res.seqres_only = False else: # the structured residue is of a different type, # so handle it as a mutation self._mutateRes(seq, seq_i, seq_res, resname) # If we're not adding a structure for a structureless # residue, then this residue is probably being reported # because of MAE-41133 and we can ignore it. new_residues.pop() elif seq_res_info < new_res_info: # list.insert(0, elem) is O(N), so we use a deque here # instead res_info_to_add = deque() while (new_residues and seq_res_info < new_residues[-1].chainKey()): # figure out if we need to insert more than one # residue here. res_info_to_add.appendleft(new_residues.pop()) self._addNewResToSeq(res_info_to_add, seq, seq_i + 1) else: if new_residues: # residues were added to the beginning of the structure self._addNewResToSeq(new_residues, seq, 0) def _addNewResToSeq(self, res_info_to_add, split_seq, index): """ Insert a new sequence residue at the specified position :param res_info_to_add: The residue number, insertion code, and residue name for the residue to add. :type res_info_to_add: NewResInfo :param split_seq: The sequence to insert the residue into. :type split_seq: sequence.ProteinSequence :param index: The sequence index to insert the new residue at. :type index: int """ res_to_add = [] for cur_res_info in res_info_to_add: res = split_seq.makeSeqElement(cur_res_info.resname) res.resnum = cur_res_info.resnum res.inscode = cur_res_info.inscode res_to_add.append(res) # If the sequence belongs to an alignment, add the residues # through the alignment api. seq_page_info = self._gui_model.getPageInfoForSequence(split_seq) if seq_page_info is None: split_seq.insertElements(index, res_to_add) else: if seq_page_info.split_chain_view: seq = split_seq else: seq = seq_page_info.seq index += seq.offsetForChain(split_seq) aln = seq_page_info.aln with self._notifyMaestroIfAnchoredRemoved(aln, 'inserted'): aln.removeAnchors(seq[index:]) aln.addElements(seq, index, res_to_add) def _renameChains(self, chain_renames): """ Rename the specified chains :param chain_renames: A list of (entry id, old chain name, new chain name) tuples for chains to rename :type chain_renames: list[tuple(int, str, str)] """ for eid, old_chain, new_chain in chain_renames: chain_data = self._entry_chain_map[eid][old_chain] for seq in chain_data.seqs: seq.chain = new_chain chain_data.chain = new_chain del self._entry_chain_map[eid][old_chain] self._entry_chain_map[eid][new_chain] = chain_data def _updateSeqres(self, modified_eids): """ Update SEQRES records for all specified entries. :param modified_eids: The entry ids for the entries to update. :type modified_eids: set(int) """ proj = maestro.project_table_get() for eid in modified_eids: entry_data = self._entry_chain_map[eid] if not entry_data.has_seqres: continue has_structureless = False for chain_data in entry_data.values(): # all of the sequences in a ChainData object are identical # except for gaps, so we only need to check one of them seq = next(iter(chain_data.seqs)) if any(res.seqres_only for res in seq if not res.is_gap): has_structureless = True break if has_structureless: cur_seqres = {} for chain, chain_data in sorted(entry_data.items()): seq = next(iter(chain_data.seqs)) cur_seqres[chain] = [ res.long_code for res in seq if not res.is_gap ] else: # There are no structureless residues, so the SEQRES records are # completely redundant with the structure itself. We clear the # SEQRES data so that we won't need to continue to keep them in # sync with the structure. cur_seqres = None entry_data.has_seqres = False struc = proj[eid].getStructure() seqres.set_seqres(struc, cur_seqres) with self._updatingSeqres(): # There's no need to sync the workspace since we've only changed # unstructured residues proj[eid].setStructure(struc, sync_workspace=False) def _getIncludedNonScratchEntryIDs(self): """ Return a list of non-scratch entry IDs included in the Workspace. :return: List of non-scratch entry IDs currently included in Workspace :rtype: list(int) """ all_eids = map(int, maestro.get_included_entry_ids()) return [e for e in all_eids if e > 0] def _getEIDsForAtomIndexesList(self, atom_indexes_list): """ Given a list of lists of Workspace atom indexes, return a generator of their entry IDs. :param atom_indexes_list: List of lists of atom indexes :type atom_indexes_list: list(list(int)) :return: Set of entry IDs for the atom indexes. :rtype: set(int) """ atom_idxs = itertools.chain(*atom_indexes_list) struc = maestro.workspace_get() eids = set() for idx in atom_idxs: try: eid = int(struc.atom[idx].entry_id) except ValueError: continue if eid > 0: eids.add(eid) return eids # @QtCore.pyqtSlot("QList<QList<int> >") @util.skip_if("_changing_maestro_visibility") def _ligandAtomsChanged(self, atom_indexes_list): """ Update sequences in response to any workspace ligand atom additions, removals, or mutations. :param atom_indexes_list: A list of list of indexes of atoms that were changed in workspace structures. :type atom_indexes_list: list """ if not atom_indexes_list: # MSV-1554 - WorkspaceHub emits empty list when all ligand atoms # are deleted. eids = self._getIncludedNonScratchEntryIDs() else: eids = self._getEIDsForAtomIndexesList(atom_indexes_list) self._ligandsChangedForEntryIDs(eids) def _getEIDsForResiduesList(self, residues_list): """ Given a list of residues, return a generator of their entry IDs. :param residues_list: List of residues to get entry IDs of :type residues_list: list(list(schrodinger.structure._Residue)) :return: Set of entry IDs :rtype: set(int) """ residues = itertools.chain(*residues_list) eids = set() for res in residues: try: eid = int(res.getEntryID()) except ValueError: continue if eid > 0: eids.add(eid) return eids # @QtCore.pyqtSlot("QList<QList<WHResidue> >") @util.skip_if("_changing_maestro_visibility") def _ligandResiduesChanged(self, residues_list): """ Update sequences in response to any workspace ligand residue additions, removals, or mutations. :param residues_list: A list of list of residues (`maestro_ui.WHResidue`) that were changed in workspace structures. :type residues_list: list """ if not residues_list: # MSV-1544 - WorkspaceHub passes an empty list when all ligand # residues have been deleted. eids = self._getIncludedNonScratchEntryIDs() else: eids = self._getEIDsForResiduesList(residues_list) self._ligandsChangedForEntryIDs(eids) def _ligandsChangedForEntryIDs(self, eids): """ Send onStructureChanged signals for the specified entry ids. :param eids: Entry IDs that have changed :type eids: iterable(int) """ for eid in eids: for chain_data in self._entry_chain_map[eid].values(): for seq in chain_data.seqs: seq.onStructureChanged() # @QtCore.pyqtSlot("QSet<WHResidue>", "QSet<WHResidue>") @util.skip_if("_closing_project") @util.skip_if("_changing_maestro_visibility") def _residueDisplayChanged(self, added, removed): """ Update sequence visibility in response to any workspace residues being shown or hidden. :param added: A set of residues (`maestro_ui.WHResidue`) that were shown in the workspace. :type added: set :param removed: A set of residues (`maestro_ui.WHResidue`) that were hidden in the workspace. :type removed: set """ # by_chain[entry id][chain name] = (set_of_added_residues, # set_of_removed_residues) by_chain = defaultdict(lambda: defaultdict(lambda: (set(), set()))) self._resByChain(by_chain, added, 0) self._resByChain(by_chain, removed, 1) for cur_eid, chains in by_chain.items(): for cur_chain, (added_chain, removed_chain) in chains.items(): try: chain_data = self._entry_chain_map[cur_eid][cur_chain] except KeyError: # It's possible that this entire chain was categorized as a # ligand (or some other type of residue that we don't care # about) by seqio.StructureConverter but not by # self._isSeqRes since StructureConverter is more thorough # about excluding things. If that's the case, there's # nothing to update and we can safely ignore information # about this chain. pass else: chain_data.updateVisRes(added_chain, removed_chain) def _resByChain(self, by_chain, residues, i): """ Organize a list of residues by entry and chain. :param by_chain: A dictionary of [entry id][chain name] = tuple of sets :type by_chain: defaultdict :param residues: A set of residues (`maestro_ui.WHResidue`) to organize. :type residues: list :param i: The index of the set that residues should be added to. :type i: int """ for whres in residues: if not self._isSeqRes(whres): continue eid = whres.getEntryID() if eid > 0: # ignore scratch entries chain = whres.getChain() by_chain[eid][chain][i].add( self._getKeyFromWHResidue(whres).chainKey()) # @QtCore.pyqtSlot("QSet<WHResidue>", "QSet<WHResidue>") @util.skip_if("_syncing_selection") @util.skip_if("_closing_project") @util.skip_if("_changing_maestro_visibility") def _residueSelectionChanged(self, selected, deselected): """ Update MSV residue selection in response to any workspace residues being selected or deselected. :param selected: A set of residues (`maestro_ui.WHResidue`) that were selected in the workspace. :type selected: set :param deselected: A set of residues (`maestro_ui.WHResidue`) that were deselected in the workspace. :type deselected: set """ selected_res_keys = [ self._getKeyFromWHResidue(whres) for whres in selected ] deselected_res_keys = [ self._getKeyFromWHResidue(whres) for whres in deselected ] with self._changingMaestroVisibility(), self._syncingSelection(): self._delayedSyncFromWorkspaceToMsv() self._setMSVResSelection(selected_res_keys, True) self._setMSVResSelection(deselected_res_keys, False) def _delayedSyncFromWorkspaceToMsv(self): """ When new entries are included in the workspace, their residue selection is not automatically synchronized until selection is changed in either the workspace or in the MSV. This method synchronizes residue selection for those entries by replacing residue selection in the MSV with residues selected in the workspace. This method should only be called from inside a `_syncingSelection` block. """ if not self._delayed_sync_eids: return ws_sel = self._workspace_hub.getSelAtomsToResSet() ws_sel_keys = [self._getKeyFromWHResidue(whres) for whres in ws_sel] ws_sel_keys = [ key for key in ws_sel_keys if key.entry_id in self._delayed_sync_eids ] self._setMsvResSelectionOnly(ws_sel_keys, self._delayed_sync_eids) self._delayed_sync_eids.clear() def _delayedSyncFromMsvToMsv(self, sel_model): """ Residue selection for linked sequences are only kept in sync while the associated structure is included in the workspace. When the associated structures are reincluded in the workspace, residue selection is not resynchronized until selection is changed in either the workspace or in the MSV. This method synchronizes residue selection for those entries by replacing residue selection in the MSV with residues selected in the given alignment selection model. If there are multiple sequences linked to the same structure chain, then the union of all residues selected in those sequences will be used. In this scenario, note that selection in these sequences will be updated to reflect this, which means that selection in the active tab *can* change as a result of this method. Note that this method does not update residue selection in the workspace. This method should only be called from inside a `_syncingSelection` block. """ if not self._delayed_sync_eids: return def include_res(res): return (res.hasStructure() and res.sequence.entry_id is not None and int(res.sequence.entry_id) in self._delayed_sync_eids) selected_res_keys = { res.getKey() for res in sel_model.getSelection() if include_res(res) } self._setMsvResSelectionOnly(selected_res_keys, self._delayed_sync_eids) self._delayed_sync_eids.clear()
[docs] def mapResidues(self, residues): # See parent class for method documentation if not residues: return [] res_keys = [] for res in residues: seq = res.sequence if not res.is_res or not seq.hasStructure(): continue eid = int(seq.entry_id) chain = res.structure_chain if not self._entry_chain_map[eid][chain].included: continue res_key = residue.get_residue_key(res, eid, chain) res_keys.append(res_key) residues = set(residues) for key in res_keys: residues.update(self._mapKeyToMSVResidues(key)) return residues
def _setMSVResSelection(self, selected_keys, select): """ Set the selection of structured residues across all tabs. Takes a collection of residue keys and whether to select or deselect them. :param selected_keys: collection of residue keys (in the same format returned by `._getKeyFromWHResidue()`) :type selected_keys: iterable(residue.ResidueKey) :param select: whether to select or deselect the associated residues :type select: bool """ selected_res = [self._mapKeyToMSVResidues(key) for key in selected_keys] selected_res = list(itertools.chain(*selected_res)) self._setMSVResSelectionByRes(selected_res, select) def _setMSVResSelectionByRes(self, selected_res, select, *, standing_selection_override=False): """ Set the selection of structured residues across all tabs. Takes a collection of residues and whether to select or deselect them. :param selected_res: collection of residues :type selected_keys: iterable(residue.Residue) :param select: whether to select or deselect the associated residues :type select: bool :param standing_selection_override: If True and a tab is in the middle of a click-and-drag selection (or in the middle of a click that may turn into a click-and-drag selection), finish the current selection in that tab so we can update selection with these changes. (The click-and-drag can continue after this, but it will be considered a separate click-and-drag and will be part of a separate undo command.) If False, tabs in the middle of a click-and-drag selection will be skipped. :type standing_selection_override: bool """ # Create mappings of sequences to the residues that we need to select # or deselect from them seq_to_selected_res = defaultdict(set) for res in selected_res: seq_to_selected_res[res.sequence].add(res) for page in self._gui_model.pages: split_aln = page.split_aln to_select = itertools.chain.from_iterable( res for seq, res in seq_to_selected_res.items() if seq in split_aln) aln = page.aln if standing_selection_override: # finishCurrentSelection is a no-op unless the selection model # is in the middle of a click-and-drag selection aln.res_selection_model.finishCurrentSelection() try: aln.res_selection_model.setSelectionState(to_select, select, _undoable=False) except gui_alignment.StandingSelectionError: # This catches the case where we attempt to sync selection # to an alignment whose selection is currently being modified. # We don't need to sync the selection for this alignment since # it's the selection we're syncing from. pass else: # Force the selectionChanged signal to emit immediately so we # can block them from causing redundant syncs. aln.res_selection_model.forceSelectionUpdate() def _mapKeyToMSVResidues(self, key): """ Map a key to all the residues in MSV that are linked to it. Expects a key formatted by `._getKeyFromWHResidue()` or `._getKeyFromStructureResidue`. :param key: a unique key representing the structure residue :type key: residue.ResidueKey :rtype: set(residue.Residue) """ if key.entry_id < 0: # This residue is from a scratch entry, so we can ignore it return set() entry_data = self._entry_chain_map[key.entry_id] try: chain_data = entry_data[key.chain] except KeyError: # This residue has no sequence data (e.g. ligand) return set() residues = chain_data.mapRescodeToResidues(key.chainKey()) return residues def _setMsvResSelectionOnly(self, to_select, eids): """ For any sequences that are linked to a structure with the given entry ids, replace the residue selection. :param to_select: Residue keys for residues to select. Should only contain residue keys with entry ids in `eids`. :type to_select: set(residue.ResidueKey) :param eids: The entry ids to replace the selection of. :type eids: Iterable(int) """ chain_keys_per_chain = defaultdict(lambda: defaultdict(set)) for key in to_select: chain_keys_per_chain[key.entry_id][key.chain].add(key.chainKey()) to_deselect = [] for cur_eid in eids: for chain, chain_data in self._entry_chain_map[cur_eid].items(): cur_to_select = chain_keys_per_chain[cur_eid][chain] cur_to_deselect = chain_data.getAllStructuredResiduesExcept( cur_to_select) to_deselect.extend(cur_to_deselect) self._setMSVResSelectionByRes(to_deselect, False, standing_selection_override=True) self._setMSVResSelection(to_select, True) @QtCore.pyqtSlot(set, set) @util.skip_if("_closing_project") @util.skip_if("_syncing_selection") @util.skip_if("_syncing_inclusion") def _alignmentSelectionChanged(self, selected, deselected): """ Update selection in the workspace when selection in the MSV changes. This should only be called by a `resSelectionChanged` signal on an `AlignmentSignals` object. :param selected: The residues who have been newly selected :type selected: iterable(schrodinger.protein.residue.Residue) :param deselected: The residues who have been newly deselected :type deselected: iterable(schrodinger.protein.residue.Residue) """ # Only modify maestro selection if any of the changed residues # were actually in the workspace selected = self._convertMsvResiduesStResidues(selected) deselected = self._convertMsvResiduesStResidues(deselected) if not (selected or deselected): return aln_signals = self.sender() sel_model = aln_signals.aln.res_selection_model deselected_res_keys = [ self._getKeyFromStructureResidue(res) for res in deselected ] selected_res_keys = [ self._getKeyFromStructureResidue(res) for res in selected ] with self._syncingSelection(): self._delayedSyncFromMsvToMsv(sel_model) self._setMSVResSelection(deselected_res_keys, False) # We overwrite the entire workspace selection every time any of the # selection changes to make sure that we remove selection from any # structures that don't have linked sequences. selected_res_keys.extend(elem.getKey() for elem in sel_model.getSelection() if elem.is_res and elem.hasStructure()) self._setMSVResSelection(selected_res_keys, True) self._syncSelectionToMaestro(sel_model.getSelection()) def _syncSelectionToMaestro(self, selection): """ Given a selection, push the selection of any structured residue to Maestro. Note that this replaces any existing workspace selection. :type selection: iterable[residue.Residue] """ structured_selection = {res for res in selection if res.hasStructure()} if structured_selection: asl = self.generateMultiEntryResidueASL(structured_selection) maestro.command('workspaceselectionreplace ' + asl) else: maestro.command('workspaceselectionclear')
[docs] def delayedSyncFromMsvToWorkspace(self, aln): # See parent class for method documentation sel_model = aln.res_selection_model with self._syncingSelection(): self._delayedSyncFromMsvToMsv(sel_model) self._syncSelectionToMaestro(sel_model.getSelection())
def _convertMsvResiduesStResidues(self, msv_residues): """ Convert sequence residues to structure residues. Sequence residues that don't correspond to a residue currently in the workspace are ignored. :param msv_residues: Sequences residues to convert :type msv_residues: Iterable(residue.AbstractSequenceElement or residue.CombinedChainResidueWrapper) :return: STructure residues :rtype: list[schrodinger.structure._structure._Residue] """ st_residues = [] for res in msv_residues: if not res.hasStructure(): continue key = res.getKey() if not self._entry_chain_map[key.entry_id][key.chain].included: continue st_res = res.sequence.getStructureResForRes(res) if st_res is not None: st_residues.append(st_res) return st_residues
[docs] def onResidueMiddleClicked(self, res): """ Fits Maestro workspace to the residue clicked with the middle button. The structure must be included in the workspace. :param res: clicked residue :type res: protein.residue.Residue """ if (res is not None and res.hasStructure() and res.sequence.visibility != Inclusion.Excluded): cmd = (f'fit (chain. {res.chain} AND res.num {res.resnum}) AND ' f'e.id {res.sequence.entry_id}') maestro.command(cmd)
[docs] def disassociateChains(self, entry_id, is_workspace=False, keep_chains=None): """ Disassociates chains for an entry. :param entry_id: The entry ID to split :type entry_id: int :return: Disassociated sequences, now with unique entry ids :rtype: list(sequence.Sequence) """ proj = maestro.project_table_get() orig_included = {row.entry_id for row in proj.included_rows} orig_eids = {row.entry_id for row in proj.all_rows} with self._changingMaestroVisibility(): maestro.command("entrydisassociatebychain entry %i" % int(entry_id)) new_eids = set() for row in proj.all_rows: eid = row.entry_id if eid in orig_included: # Re-include original entries row.in_workspace = project.IN_WORKSPACE elif eid not in orig_eids: # Temporarily exclude new entries row.in_workspace = project.NOT_IN_WORKSPACE new_eids.add(eid) # Get the new sequences new_seqs = self.getSeqsForEids(sorted(new_eids)) if keep_chains is not None: new_seqs = [seq for seq in new_seqs if seq.chain in keep_chains] new_eids = {seq.entry_id for seq in new_seqs} for eid in new_eids: proj[eid].in_workspace = project.IN_WORKSPACE if is_workspace: # Now that they have been included, get the corresponding sequences # from the workspace alignment new_seqs = [ seq for seq in self._workspace_aln if seq.entry_id in new_eids ] return new_seqs
[docs] def superimposeByAlignment(self, entry_residue_map): """ Creates and runs a Maestro command to superimpose structures by aligned residues in the Multiple Sequence Viewer. :param selected_seqs: Current selected sequences in MSV :type selected_seqs: iterable(sequence.ProteinSequence) :raise ValueError: If selected_seqs do not have unique entry IDs """ # Cache list of included entries proj = maestro.project_table_get() eids = {row.entry_id for row in proj.included_rows} # Exclude currently included entries that aren't in selected_seqs wanted_eids = set(entry_residue_map.keys()) temp_exclude = eids - wanted_eids for eid in temp_exclude: proj[eid].in_workspace = project.NOT_IN_WORKSPACE new_include = wanted_eids - eids for eid in new_include: proj[eid].in_workspace = project.IN_WORKSPACE asl = self.generateEntryResidueASL(entry_residue_map) if asl == "": maestro.command("workspaceselectionclear") else: asl = f"atom.ptype CA AND ({asl})" maestro.command("workspaceselectionreplace " + asl) try: maestro.command("superimposeset " + asl) except Exception as e: QtWidgets.QMessageBox.critical( None, "Structure Superposition from Sequence Alignment Failed", str(e)) # Re-include for eid in temp_exclude: proj[eid].in_workspace = project.IN_WORKSPACE
[docs] @util.skip_if("_updating_color") def onWorkspaceChanged(self, changed): """ Callback that is called whenever the maestro workspace changes. See maestro_callback for details. :param changed: What kind of change occured in the workspace :type changed: a WORKSPACE_CHANGED_* constant in maestro.py """ if changed in [ maestro.WORKSPACE_CHANGED_COLOR, maestro.WORKSPACE_CHANGED_EVERYTHING ]: self.workspaceColorsChanged.emit()
def _get_ResidueKey(self, res): """ :param res: The residue to turn into a key :type res: schrodinger.structure._Residue :return: A unique key to identify the residue in the workspace :rtype: residue.ResidueKey """ return residue.get_structure_residue_key(res, res.atom[1].entry_id)
[docs] def getWorkspaceColors(self): """ Get the colors of each sequence residue in the workspace. For amino acid residues, the color of the alpha carbon is returned; for nucleotide residues, the color of the C1' on the sugar is returned. :return: The colors of each residue in the workspace. Each residue is represented by a tuple of (entry_id, chain, resnum, inscode) and each color is represented by a tuple of (r,g,b) values. :rtype: dict(residue.ResidueKey, tuple(int, int, int)) """ color_map = {} struc = maestro.workspace_get() for res in struc.residue: color_atom = res.getAlphaCarbon() if color_atom is None: # For nucleotides, use the color of C1' instead of C-alpha color_atom = res.getAtomByPdbName(" C1'") if color_atom is None: # This residue is not an amino acid or a nucleotide, so we # don't need its color. continue key = self._get_ResidueKey(res) color_map[key] = color_atom.color.rgb return color_map
[docs] def setWorkspaceColors(self, color_map, all_atoms=False): """ Set the colors in the workspace for all the residues in the color map. If a residue is not in the color map, its color will not be changed. All atoms in a residue will be re-colored. :param color_map: The new colors that residues should have. Each residue is represented by a 4-tuple of (entry_id, chain, resnum, inscode), and each color is represented by a tuple of (r,g,b) values. :type color_map: dict(residue.ResidueKey, tuple(int, int, int)) :param all_atoms: Whether to color all atoms or just carbons :type all_atoms: bool """ proj = maestro.project_table_get() all_eids = {key.entry_id for key in color_map.keys() if key is not None} for eid in all_eids: struc = proj[eid].getStructure() for res in struc.residue: key = self._get_ResidueKey(res) color = color_map.get(key) if color is None: continue for atom in res.atom: if all_atoms or atom.element == 'C': atom.setColorRGB(*color) with self._updatingColor(): proj[eid].setStructure(struc)
[docs]class StandaloneStructureModel(AbstractStructureModel): """ A structure model for when the MSV is run directly from the command line. :note: When copying a sequence, this structure model currently strips all structural information from the copy. If we need the copy to retain structural information, we should make sure that setting the structure on one copied chain updates the structure on all other copied chains without affecting the structure from the original sequences. """
[docs] def __init__(self): super().__init__() self._eid = 1
[docs] def renameSeq(self, seq, new_name): """ Rename the specified sequence :param seq: Sequence to be renamed :type seq: sequence.ProteinSequence :param new_name: New name for the sequence :type new_name: str """ aln = self._gui_model.getAlignmentOfSequence(seq) aln.renameSeq(seq, new_name)
def _readStructures(self, filename): # See AbstractStructureModel for method documentation strucs = list(structure.StructureReader(filename)) seqs = [] for cur_struc in strucs: # Since there's no project, we generate fake entry ids eid = self._eid self._eid += 1 # Set eid on structure so eid-based ASLs work cur_struc.property['s_m_entry_id'] = str(eid) cur_seqs = self._convertStructure(cur_struc, eid) for cur_seq in cur_seqs: cur_seq._get_structure = partial(copy.copy, cur_struc) # We use weakrefs so that we don't prevent garbage collection # for sequences or structures weak_seqs = list(map(weakref.ref, cur_seqs)) cur_seq._set_structure = partial(self._setStructure, weak_seqs) seqs.extend(cur_seqs) for cur_seq in seqs: cur_seq.sequenceCopied.connect(self._sequenceCopied) return seqs def _setStructure(self, weak_seqs, struc): """ Set the structure on all given sequences. :param weak_seqs: A list of weak references to sequences. If the referenced sequence has been deleted, it will be ignored. :type weak_seqs: list[weakref.ref] :param struc: The new structure to set. :type struc: structure.Structure """ for cur_weak_seq in weak_seqs: seq = cur_weak_seq() if seq is not None: seq._get_structure = partial(copy.copy, struc) @QtCore.pyqtSlot(object, object) def _sequenceCopied(self, orig_seq, copy_seq): """ When a sequence that we're monitoring is copied, strip all structural information from the copy. See the class docstring for additional information. :param orig_seq: The sequence being copied. :type orig_seq: schrodinger.protein.sequence.Sequence :param copy_seq: The newly created copy. :type copy_seq: schrodinger.protein.sequence.Sequence """ copy_seq.entry_id = None
[docs]class PyMolStructureModel(AbstractStructureModel): """ A stub for a PyMol structure model. """