Source code for schrodinger.application.msv.gui.msv_widget

import copy
import cProfile as profile
import itertools
import os
import time
import weakref
from collections import Counter
from collections import defaultdict
from functools import partial

import decorator
import inflect

import schrodinger
from schrodinger.application.msv import command
from schrodinger.application.msv import seqio
from schrodinger.application.msv import structure_model
from schrodinger.application.msv.gui import color
from schrodinger.application.msv.gui import dialogs
from schrodinger.application.msv.gui import gui_alignment
from schrodinger.application.msv.gui import gui_models
from schrodinger.application.msv.gui import picking
from schrodinger.application.msv.gui import toolbar
from schrodinger.application.msv.gui import validate_align
from schrodinger.application.msv.gui import view
from schrodinger.application.msv.gui import viewconstants
from schrodinger.application.msv.gui import viewmodel
from schrodinger.application.msv.gui.picking import PickMode
from schrodinger.infra import util
from schrodinger.models import mappers
from schrodinger.protein import align
from schrodinger.protein import annotation
from schrodinger.protein import constants
from schrodinger.protein import predictors
from schrodinger.protein import properties
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.tasks import binding_site_align
from schrodinger.protein.tasks import blast
from schrodinger.protein.tasks import clustal
from schrodinger.protein.tasks import descriptors
from schrodinger.protein.tasks import kinase
from schrodinger.protein.tasks import muscle
from schrodinger.protein.tasks import optimize_alignment
from schrodinger.protein.tasks import pfam
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.Qt import QtPrintSupport
from schrodinger.Qt import QtSvg
from schrodinger.Qt import QtWidgets
from schrodinger.structutils.interactions import protein_protein_interactions
from schrodinger.tasks import queue
from schrodinger.tasks import tasks
from schrodinger.ui.qt import appframework as af1
from schrodinger.ui.qt import basewidgets
from schrodinger.ui.qt import utils as qt_utils
from schrodinger.ui.qt.appframework2 import application
from schrodinger.utils import profiling
from schrodinger.utils.fileutils import slugify
from typing import Union

maestro = schrodinger.get_maestro()

SEQ_ANNO_TYPES = annotation.ProteinSequenceAnnotations.ANNOTATION_TYPES
PRED_ANNO_TYPES = annotation.ProteinSequenceAnnotations.PRED_ANNOTATION_TYPES
ALN_ANNO_TYPES = annotation.ProteinAlignmentAnnotations.ANNOTATION_TYPES
RES_PROP_ANNOS = annotation.ProteinSequenceAnnotations.RES_PROPENSITY_ANNOTATIONS
PRED_ANNO_TO_PRED_FUNC = predictors.PRED_ANNO_TO_PRED_FUNC
PROPNAME_BINDINGSITE_RMSD = 'r_psp_BindingSiteAlign_RMSD'
MAX_FASTA_LINE_LENGTH = 60
SENTINEL = object()

_DESCRIPTORS_CLEARED_WARNING_MSG = (
    "The descriptor values associated with the modified sequences are invalid and "
    "will be discarded. \n\nWhen you have finished editing the sequences or "
    "structures, use Compute Sequence Descriptors... from the "
    "Other Tasks menu to recompute them.")


[docs]def format_seq_name_with_chain(seq, include_chain): """ Return a name identifying the sequence, with the chain name optionally in parentheses :param seq: The sequence to format :type seq: `sequence.Sequence` :param include_chain: Whether to include the chain in the name :type include_chain: bool :return: A formatted name to identify the sequence :rtype: str """ name = "" if seq is None else seq.name if name and include_chain: chain_name = seq.chain if chain_name: name += f" ({chain_name})" return name
[docs]@decorator.decorator def prompt_if_seqs_hidden(func, self, *args, **kwargs): """ Decorator that shows a dialog and skips the function if the widget has hidden seqs :type self: AbstractMsvWidget """ aln = self.getAlignment() has_hidden_seqs = dialogs.prompt_for_hidden_seqs(self, aln) if has_hidden_seqs: return func(self, *args, **kwargs)
def _getNextSetName(last_name): """ Generate a name string for the set by appending a number incrementally to the previous name. eg :: 'Set 1' --> 'Set 2' 'MySet' --> 'MySet 2' :param last_name: Mame of the previous set. :type last_name: str :rtype: str """ split_list = last_name.split() num = split_list[-1] if num.isnumeric(): new_num = int(num) + 1 split_list[-1] = str(new_num) else: split_list.append("2") name = " ".join(split_list) return name
[docs]class MsvTaskQueue(queue.TaskQueue): description: str = None
[docs]class AbstractMsvWidget(mappers.MapperMixin, basewidgets.BaseWidget): """ Acts as a controller/container for a ProteinAlignment, the viewmodel proxy stacks that wrap it, and the views required to present these proxy stacks. Each tab in the MSV contains a different instance of an MSV Widget. :cvar residueHovered: Signal emitted when a residue cell is hovered :cvar residueUnhovered: Signal emitted when a residue cell is unhovered :cvar resHighlightStatusChanged: Signal emitted when residue highlighting changes. Emitted with whether any residues are highlighted. :cvar renameSequenceRequested: A signal emitted to notify listeners to open the rename sequence dialog :ivar taskStarted: Signal emitted when a task is started. Task types should have a description defined in `toolbar.TaskStatusBar.PRETTY_TASK_NAMES`. """ model_class = gui_models.PageModel duplicateIntoNewTabRequested = QtCore.pyqtSignal() translateIntoNewTabRequested = QtCore.pyqtSignal(list) residueHovered = QtCore.pyqtSignal(object) residueUnhovered = QtCore.pyqtSignal() openColorPanelRequested = QtCore.pyqtSignal() resHighlightStatusChanged = QtCore.pyqtSignal(bool) editToolbarUpdateRequested = QtCore.pyqtSignal() renameSequenceRequested = QtCore.pyqtSignal() taskStarted = QtCore.pyqtSignal(tasks.AbstractTask) # We want to flag when the MSV widget modifies the options model so that # changes in the options model don't re-trigger changes in the MSV widget. _changingOptionsModel = util.flag_context_manager("_changing_options_model")
[docs] def __init__(self, parent=None, struc_model=None, *, undo_stack): """ :param parent: The Qt parent :type parent: QtWidgets.QWidget :param struc_model: The structure model to use when interacting with sequences associated with three-dimensional structures. If None, a new structure model will be created. :type struc_model: structure_model.StructureModel :param undo_stack: The undo stack to use. :type undo_stack: schrodinger.application.msv.command.UndoStack """ self._curr_color_scheme = None # TODO MSV-2239 find better way to hide initialization commands # We set the undo stack below. If we set it here then some # initialization steps would unintentionally add commands to the stack. self.undo_stack = None self._undo_stack = undo_stack # Needed to create alignment view super().__init__(parent=parent) self.setStructureModel(struc_model) # Now that initialization is done, we can safely set the undo stack # without the risk of unintentionally adding commands. self.undo_stack = undo_stack del self._undo_stack self.getAlignment().setUndoStack(self.undo_stack)
[docs] def initSetOptions(self): super().initSetOptions() self._changing_options_model = None self._current_pattern_idx = None self._table_model = None self.dendrogram_viewer = None self._syncing_dendrogram_selection = False self.get_sequences_dialog = None self.noscroll_view = None self._light_mode = False self._previous_set_name = None
[docs] def initSetUp(self): super().initSetUp() self.seq_status_model = toolbar.AlnStatusModel() self.res_status_model = toolbar.ResStatusModel() self._table_model = self._getAlignmentViewModel() # viewer related setup self.view = self._getAlignmentView(self._undo_stack) self.view.setFrameShape(QtWidgets.QFrame.NoFrame) self.view.residueHovered.connect(self.residueHovered) self.view.residueUnhovered.connect(self.residueUnhovered) self.view.residueMiddleClicked.connect(self._onResidueMiddleClicked) self.view.openColorPanelRequested.connect(self.openColorPanelRequested) self.aln_info_view = self._getAlignmentInfoView() # yapf: disable self.aln_info_view.deleteFromAllRequested.connect(self.deleteAnnsFromAll) self.aln_info_view.clearAnnotationRequested.connect(self.clearAnnotations) self.aln_info_view.clearConstraintsRequested.connect(self.clearConstraints) self.aln_info_view.deselectResiduesClicked.connect(self.deselectResForSeqs) self.aln_info_view.duplicateAsRefSeqRequested.connect(self.duplicateAsRef) self.aln_info_view.duplicateAtBottomRequested.connect(self.duplicateAtBottom) self.aln_info_view.duplicateAtTopRequested.connect(self.duplicateAtTop) self.aln_info_view.duplicateInPlaceRequested.connect(self.duplicateInPlace) self.aln_info_view.duplicateIntoNewTabRequested.connect(self.duplicateIntoNewTabRequested) self.aln_info_view.findHomologsClicked.connect(self.blastSelectedSeq) self.aln_info_view.findInListRequested.connect(self.enableFindSequence) self.aln_info_view.mvSeqsClicked.connect(self.moveSelectedSequences) self.aln_info_view.renameSeqClicked.connect(self.renameSequenceRequested) self.aln_info_view.renameAlnSetClicked.connect(self._renameAlnSet) self.aln_info_view.selectAlnSetClicked.connect(self._selectAlnSet) self.aln_info_view.deselectAlnSetClicked.connect(self._deselectAlnSet) self.aln_info_view.dissolveAlnSetClicked.connect(self._dissolveAlnSet) self.aln_info_view.gatherAlnSetsClicked.connect(self._gatherAlnSets) self.aln_info_view.rmSeqsClicked.connect(self.removeSelectedSeqs) self.aln_info_view.exportSequencesRequested.connect(self._exportSelectedSeqs) self.aln_info_view.hideSeqsRequested.connect(self.hideSelectedSeqs) self.aln_info_view.selectRowResidues.connect(self.selectResForSeqs) self.aln_info_view.setAsReferenceSeq.connect(self.setSelectedSeqAsReference) self.aln_info_view.sortRequested.connect(self.sortBy) self.aln_info_view.alnSetCreateRequested.connect(self._createAlnSet) self.aln_info_view.alnSetAddRequested.connect(self._addToAlnSet) self.aln_info_view.alnSetRemoveRequested.connect(self._removeSeqsFromAlnSet) self.aln_info_view.translateDnaRnaRequested.connect( self.translateSelectedSequences) # yapf: enable self.aln_metrics_view = self._getAlignmentMetricsView() self.aln_metrics_view.sortRequested.connect(self.sortBy) # Main view widget and load sequences bar setup self.view_widget = QtWidgets.QWidget(self) self.view_widget.setObjectName("view_widget") self._table_model.topModelChanged.connect(self.view.setModel) self.aln_metrics_view.setModel(self._table_model.metrics_model) self.aln_info_view.setModel(self._table_model.info_model) self._seq_export_dialog = None self._image_export_dialog = None
[docs] def initLayOut(self): super().initLayOut() view_layout = QtWidgets.QGridLayout() self.view_widget.setLayout(view_layout) view_layout.setSpacing(0) # Extract scrollbar from the view and insert it to the left of the view self.v_scrollbar = self.view.verticalScrollBar() self.v_scrollbar.setObjectName("v_scrollbar") self.v_scrollbar.setEnabled(False) self.v_scrollbar.rangeChanged.connect(self.onVScrollBarRangeChanged) view_layout.addWidget(self.v_scrollbar, 0, 0) # MSV-1875 Place horizontal scrollbar beneath views to prevent # blocking of viewports self.view.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff) self.h_scrollbar = self.view.horizontalScrollBar() self.h_scrollbar.setObjectName("h_scrollbar") view_layout.addWidget(self.h_scrollbar, 1, 1) view_layout.addWidget(self.view, 0, 1) view_layout.addWidget(self.aln_metrics_view, 0, 2) view_layout.setContentsMargins(0, 0, 0, 0) view_layout.setRowStretch(0, 1) # Align views on 2 sides of the splitter by adding padding to aln_info: hbarheight = self.h_scrollbar.height() aln_info_layout = QtWidgets.QHBoxLayout() aln_info_layout.setContentsMargins(0, 0, 0, hbarheight) aln_info_layout.addWidget(self.aln_info_view) # Create a splitter, with aln_info_view on the left, and view_widget # on the right: self.splitter = QtWidgets.QSplitter() self.splitter.setObjectName("msv_widget_splitter") outer_layout = QtWidgets.QHBoxLayout(self.splitter) outer_layout.addLayout(aln_info_layout) outer_layout.addWidget(self.view_widget) outer_layout.setContentsMargins(0, 0, 0, 0) # So the splitter doesn't snap shut self.splitter.setCollapsible(0, False) self.splitter.setCollapsible(1, False) # Resizing the main window should resize view_widget not aln_info self.splitter.setStretchFactor(1, 1) # Setting a minimum width so at least one column appears self.view.setMinimumWidth(25) self.layout().setContentsMargins(0, 0, 0, 0) self.layout().addWidget(self.splitter) self.updateColorsEnabled() self.updateSeqColorScheme() # don't enable row wrap until after the table's geometry is set QtCore.QTimer.singleShot(0, self.updateRowWrap)
[docs] @QtCore.pyqtSlot(int, int) def onVScrollBarRangeChanged(self, _, max_): """ When the vertical scrollbar is disabled, (i.e. the min and max are 0) style it to be mostly invisible. """ self.v_scrollbar.setEnabled(max_ != 0) qt_utils.update_widget_style(self.v_scrollbar)
[docs] def setModel(self, model): # MapperMixin super().setModel(model) self.setProperty("is_ws_tab", self.model.is_workspace) self.view.setIsWorkspace(self.model.is_workspace) self.view.setMenuModel(self.model.menu_statuses) self.aln_info_view.setMenuModel(self.model.menu_statuses) self._table_model.setPageModel(model) self._onAlnChanged(model.aln) self.updateExpandEnabled() self.view.setModel(self._table_model.top_model) self._loadInitialColorScheme() self._updateRenumberResEnabled() self.updateGetStructureLinkSeq()
def _loadInitialColorScheme(self): """ Load the color scheme from the model that was just set. """ if self.model.options.colors_enabled: scheme = self.model.options.seq_color_scheme else: scheme = color.NoColorScheme() self._table_model.updateColorScheme(viewconstants.RowType.Sequence, scheme) if isinstance(scheme, color.WorkspaceScheme): # If we're in the process of loading the Maestro project, then the # workspace isn't populated yet, so we need to wait before we get # the workspace colors QtCore.QTimer.singleShot(0, self.pullWorkspaceColors)
[docs] def getSignalsAndSlots(self, model): options_model = model.options signals = model.aln_signals ss = [ (signals.hiddenSeqsChanged, self._updateSeqStatusModel), (signals.resHighlightStatusChanged, self.resHighlightStatusChanged), (signals.resHighlightStatusChanged, self.updateRemoveHighlightsEnabled), (signals.resOutlineStatusChanged, self._updateResOutlineState), (signals.pairwiseConstraintsChanged, self._updateClearConstraintsEnabled), (signals.syncWsResSelection, self._syncWsResSelectionForSeqs), (signals.sequenceStructureChanged, self.updateCanUnlinkSequences), (signals.sequenceStructureChanged, self.updateGetStructureLinkSeq), (signals.resSelectionChanged, self._onResSelectionChanged), (signals.seqSelectionChanged, self._onSeqSelectionChanged), (signals.alnSetChanged, self._onAlnSetChanged), (model.alnChanged, self._onAlnChanged), (options_model.colors_enabledChanged, self.updateColorsEnabled), (options_model.group_byChanged, self.updateExpandEnabled), (options_model.seq_color_schemeChanged, self.updateSeqColorScheme), (options_model.wrap_sequencesChanged, self.updateRowWrap), (options_model.pick_modeChanged, self._updatePickMode), (options_model.custom_color_schemeChanged, self._onCustomColorSchemeChanged), (signals.predictionsChanged, self._updateCanDeleteAllPredictions), (signals.alignmentNumColumnsChanged, self._updateColorScheme), (signals.sequencesReordered, self._updateColorScheme), (signals.anchoredResiduesChanged, self.updateAnchorSelectionEnabled), (signals.sequenceVisibilityChanged, self._updateStatusModelNumStructs), (signals.sequencesAboutToBeInserted, self._clearSeqFilter), (signals.sequencesInserted, self._updateSeqStatusModel), (signals.sequencesInserted, self.updateSelectMenuEnabled), (signals.sequencesInserted, self._updateRenumberResEnabled), (signals.sequencesInserted, self.updateSelectProteinInterfaceMenuEnabled), (signals.sequencesInserted, self._generateKinaseFeatures), (signals.sequencesRemoved, self._updateSeqStatusModel), (signals.sequencesRemoved, self.updateSelectMenuEnabled), (signals.sequencesRemoved, self.updateSelectProteinInterfaceMenuEnabled), (signals.sequencesRemoved, self._updateRenumberResEnabled), (signals.sequencesReordered, self._updateStatusModelRefSeq), (signals.sequencesReordered, self.updateMoveSequencesEnabled), (signals.sequencesReordered, self.updateSetAsRefEnabled), (signals.sequenceStructureChanged, self.updateSelectProteinInterfaceMenuEnabled), (signals.homologyCompositeResiduesChanged, self._onCompositeResiduesChanged), ] # yapf: disable edit_toolbar_signals = [ signals.alignmentCleared, signals.sequencesInserted, signals.sequencesRemoved, signals.sequencesReordered, signals.residuesRemoved, signals.residuesAdded, signals.resSelectionChanged ] # yapf: disable for cur_sig in edit_toolbar_signals: ss.append((cur_sig, self._updateResStatusAndEditStatus)) return ss
@QtCore.pyqtSlot() def _onResSelectionChanged(self): self.updateAnchorSelectionEnabled() self.updateRemoveHighlightsEnabled() self._updateCanAlignBindingSites() @QtCore.pyqtSlot() def _onSeqSelectionChanged(self): self.updateCanUnlinkSequences() self._updateStatusModelNumSelected() self.updateAlignSelectedSeqsEnabled() self.updateDeleteSequencesEnabled() self.updateDuplicateSeqOptionsEnabled() self._updateEditSequenceAsTextEnabled() self.updateMoveSequencesEnabled() self.updateRenameSeqEnabled() self.updateSetAsRefEnabled() self._updateRemoveFromAlnSetEnabled() self._updateAlignAlnSetEnabled() self._updateDendrogramSelection() self.updateGetStructureLinkSeq()
[docs] def getValidIdMap(self): """ Get the valid id map for the currently selected sequences. :return: Map of valid PDB IDs to their source sequence :rtype: dict(str: sequence.Sequence) """ sel_seqs = self.getSelectedSequences() valid_id_map = seqio.get_valid_pdb_id_map_for_seqs(sel_seqs) return valid_id_map
[docs] def updateGetStructureLinkSeq(self): """ Update if getting the structure from PDB action is enabled in the view. """ valid_id_map = self.getValidIdMap() seq_menu_model = self.aln_info_view.seq_context_menu.model if self.isWorkspace(): seq_menu_model.can_get_pdb_sts = False seq_menu_model.can_link_or_unlink_sequences = False else: seq_menu_model.can_get_pdb_sts = bool(valid_id_map) seq_menu_model.can_link_or_unlink_sequences = True
def _updateDendrogramSelection(self): """ Update selection in the dendrogram so that it matches selection in the MSV """ if not self.dendrogram_viewer: return selected_seqs = self.getSelectedSequences() with qt_utils.suppress_signals(self.dendrogram_viewer): self.dendrogram_viewer.setSelection(selected_seqs) @util.skip_if("_syncing_dendrogram_selection") def _updateSelectionFromDendrogram(self): """ Respond to selection changes from the dendrogram by selecting corresponding sequences """ self._syncing_dendrogram_selection = True seqs_to_select = self.dendrogram_viewer.getSelection() self.setSelectedSequences(seqs_to_select) self._syncing_dendrogram_selection = False @QtCore.pyqtSlot(object) @QtCore.pyqtSlot(color.AbstractRowColorScheme) def _onCustomColorSchemeChanged(self, scheme): """ Update colors if the custom color scheme changed. This is for handling when the custom color scheme is changed on a different page and this tab is currently using the custom color scheme. """ model = self.model table_model = self._table_model if not self._changing_options_model and model.options.seq_color_scheme.custom: if scheme is not None: model.options.seq_color_scheme = scheme table_model.updateColorScheme(viewconstants.RowType.Sequence, scheme) else: model.reset(gui_models.PageModel.options.seq_color_scheme) table_model.updateColorScheme(viewconstants.RowType.Sequence, model.options.seq_color_scheme) @property def options_model(self): return self.model.options @property def image_export_dialog(self): """ Delay instantiation of the image export dialog until it is needed to prevent unnecessary file handles from being opened :return: the image export dialog :rtype: dialogs.MSVImageExportDialog """ if self._image_export_dialog is None: self._image_export_dialog = dialogs.MSVImageExportDialog(self) return self._image_export_dialog @property def seq_export_dialog(self): """ Delay instantiation of the image export dialog until it is needed to prevent unnecessary file handles from being opened :return: the image export dialog :rtype: dialogs.MSVSequenceExportDialog """ if self._seq_export_dialog is None: self._seq_export_dialog = dialogs.MSVSequenceExportDialog(self) return self._seq_export_dialog
[docs] def openPropertyDialog(self): """ Show the dialog that controls which properties are shown in the alignment metrics view """ dlg = dialogs.ManagePropertiesDialog( self.model.aln, self.model.options.sequence_properties, self) dlg.seq_props_updated.connect(self._onSeqPropsUpdated) dlg.adjustSize() dlg.run(modal=True)
@QtCore.pyqtSlot(list) def _onSeqPropsUpdated(self, seq_props): self.model.options.sequence_properties = seq_props # TODO: LOOK AT ME self.model.options.sequence_propertiesChanged.emit( self.model.options.sequence_properties) @QtCore.pyqtSlot(object) @QtCore.pyqtSlot(gui_alignment.GuiProteinAlignment) def _onAlnChanged(self, aln): self._updateSeqStatusModel() self._updateResStatusAndEditStatus() self.updateRemoveHighlightsEnabled() self._onAlnSetChanged() if self.undo_stack is not None: # TODO remove check after MSV-2239 aln.setUndoStack(self.undo_stack) self._table_model.setAlignment(aln)
[docs] def setStructureModel(self, model=None): if model is None: model = structure_model.StructureModel(self, self.undo_stack) self._structure_model = model self._structure_model.workspaceColorsChanged.connect( self.onWorkspaceColorsChanged) self._table_model.setStructureModel(model)
[docs] def getShownSplitSequences(self): """ Return the currently shown single-chain sequences in the order they appear in the alignment :rtype: tuple """ aln = self.model.split_aln shown_states = aln.getSeqShownStates() return tuple(s for s, shown in zip(aln, shown_states) if shown)
[docs] def getSelectedSequences(self): """ Return a list of the currently selected sequences in the order they appear in the alignment :return: list of selected sequences :rtype: list of `sequence.Sequence` """ aln = self.getAlignment() return aln.getSelectedSequences()
[docs] def setSelectedSequences(self, seqs): """ Set sequence selection to the specified sequences. :param seqs: List of sequences to select :type seqs: list of `schrodinger.protein.sequence.Sequence` """ aln = self.getAlignment() seq_selection_model = aln.seq_selection_model seq_selection_model.clearSelection() if seqs: seq_selection_model.setSelectionState(seqs, True)
[docs] def selectAllSequences(self): """ Select all sequences """ self.setSelectedSequences(self.getAlignment())
[docs] def deselectAllSequences(self): """ Deselect all sequences """ self.setSelectedSequences([])
[docs] def selectSequencesWithStructure(self): """ Select all sequences with structures. """ aln = self.getAlignment() seqs = [s for s in aln if s.hasStructure()] self.setSelectedSequences(seqs)
[docs] def selectSequenceByIdentity(self): """ Open a dialog to select sequences by identity. """ dialogs.SelectByIdentityDialog(parent=self).run()
[docs] def invertSequenceSelection(self): """ Invert sequence selection. """ aln = self.getAlignment() sel_seqs = set(aln.getSelectedSequences()) seqs = [s for s in aln if s not in sel_seqs] self.setSelectedSequences(seqs)
[docs] def selectAntibodyHeavyChain(self): aln = self.getAlignment() heavy_seqs = [s for s in aln if s.annotations.isAntibodyHeavyChain()] self.setSelectedSequences(heavy_seqs)
[docs] def selectAntibodyLightChain(self): aln = self.getAlignment() light_seqs = [s for s in aln if s.annotations.isAntibodyLightChain()] self.setSelectedSequences(light_seqs)
[docs] def selectAllResidues(self): """ Selects all residues """ aln = self.getAlignment() aln.res_selection_model.selectAll()
[docs] def deselectAllResidues(self): """ Deselects all residues """ aln = self.getAlignment() aln.res_selection_model.clearSelection()
[docs] def selectResiduesWithStructure(self): """ Selects all residues with structure """ self.getAlignment().selectResiduesWithStructure()
[docs] def selectAntibodyCDR(self): """ Select residues with Antibody CDR. """ self.getAlignment().selectAntibodyCDR( scheme=self.options_model.antibody_cdr_scheme)
[docs] def selectColsWithStructure(self): """ Select all columns that contain only structured residues. """ self.getAlignment().selectColsWithStructure()
[docs] def selectBindingSites(self): """ Select residues with binding site contacts. """ self.getAlignment().selectBindingSites()
[docs] def selectProteinInterface(self): """ Select the protein interface. """ aln = self.getAlignment() structs = list(aln.all_structures) interface_atoms = protein_protein_interactions.get_interface_atoms( structs) interface_residues = { (at.getResidue(), at.entry_id) for at in interface_atoms } struct_res_keys = { residue.get_structure_residue_key(*info) for info in interface_residues } def should_select(res): seq = res.sequence res_key = residue.get_residue_key(res, seq.entry_id, seq.chain) return res_key in struct_res_keys residues_to_select = [ res for res in aln.getResiduesWithStructure() if should_select(res) ] aln.res_selection_model.setSelectionState(residues_to_select, True)
[docs] def deselectGaps(self): self.getAlignment().deselectGaps()
[docs] def invertResSelection(self): """ Invert the selection """ self.getAlignment().invertResSelection()
[docs] @prompt_if_seqs_hidden def deleteRedundantSeqs(self): """ Show the dialog for defining and removing redundant sequences """ self._del_redundant_dlg = dialogs.DeleteRedundantSeqsDialog(self) self._del_redundant_dlg.exec() self._del_redundant_dlg = None
[docs] def deleteAllPredictions(self): aln = self.getAlignment() for seq in aln: seq.deleteAllPredictions()
[docs] @QtCore.pyqtSlot() def deleteAnnsFromAll(self): """ Delete selected annotations from all sequences. """ opts = self.options_model ann_sel_model = self.model.aln.ann_selection_model pfam_names = set() pred_types = set() for ann_info in ann_sel_model.getSelection(): ann = ann_info.ann if ann is SEQ_ANNO_TYPES.pfam: pfam_name = ann_info.seq.pfam_name if pfam_name is not None: pfam_names.add(pfam_name) elif ann in PRED_ANNO_TYPES: pred_types.add(ann) if pfam_names: any_pfam = False for seq in self.getAlignment(): if seq.pfam_name in pfam_names: seq.clearPfam() elif any_pfam is False and seq.hasPfam(): any_pfam = True if not any_pfam: pred_types.add(SEQ_ANNO_TYPES.pfam) # Hide specified prediction types opts.predicted_annotations = opts.predicted_annotations - pred_types
[docs] @QtCore.pyqtSlot() def clearAnnotations(self): """ Hide all selected annotations. Does not affect predicted annotations. """ opts = self.options_model ann_sel_model = self.model.aln.ann_selection_model anns = { ann_info.ann for ann_info in ann_sel_model.getSelection() if ann_info.ann not in PRED_ANNO_TYPES } for included_annotations in [ opts.sequence_annotations, opts.alignment_annotations, opts.residue_propensity_annotations ]: if included_annotations & anns: # Explicitly checking intersection first because params in-place # operations skip equality checks for performance reasons and # always emit changed signals included_annotations.difference_update(anns)
@QtCore.pyqtSlot(object) def _updatePickMode(self, pick_mode): """ When the pick mode is changed, modify the alignment as needed """ aln = self.getAlignment() show_constraints = pick_mode is PickMode.HMProximity if pick_mode is PickMode.HMChimera: self.model.options.color_by_aln = viewconstants.ColorByAln.Matching # Turning off chimera is handled by homology panel self.view.setChimeraShown(True) elif pick_mode is PickMode.Pairwise: if len(aln) > 1: ref_seq = aln.getReferenceSeq() self.moveSelectedSequences(viewconstants.Direction.Top) self.setSequenceExpansionState([ref_seq]) show_constraints = True self.view.setConstraintsShown(show_constraints) self.view.setLigandConstraintsShown(pick_mode is PickMode.HMBindingSite)
[docs] def resetPick(self, pick_mode): """ Set the default pick for the specified pick mode """ if pick_mode is None: return aln = self.getAlignment() picking.handle_reset_pick(aln, pick_mode)
[docs] @QtCore.pyqtSlot() def clearConstraints(self): self.getAlignment().resetPairwiseConstraints()
@QtCore.pyqtSlot() def _onCompositeResiduesChanged(self): """ If composite residues becomes empty, stop picking """ if not self.getAlignment().homology_composite_residues: self.model.options.pick_mode = None # ========================================================================== # Menu enabling # ==========================================================================
[docs] @QtCore.pyqtSlot() def updateCanUnlinkSequences(self): if self.isWorkspace(): can_unlink_sequences = False else: aln = self.getAlignment() selection = aln.seq_selection_model.getSelection() can_unlink_sequences = any(seq.hasStructure() for seq in selection) self.model.menu_statuses.can_unlink_sequences = can_unlink_sequences
[docs] @QtCore.pyqtSlot() def updateDeleteSequencesEnabled(self): if self.isWorkspace(): can_delete_seqs = False else: aln = self.getAlignment() can_delete_seqs = aln.seq_selection_model.hasSelection() self.model.menu_statuses.can_delete_sequences = can_delete_seqs
[docs] @QtCore.pyqtSlot() def updateMoveSequencesEnabled(self): aln = self.getAlignment() ref_seq = aln.getReferenceSeq() sel_seqs = aln.seq_selection_model.getSelection() can_move_seqs = bool(sel_seqs) and not any(seq == ref_seq for seq in sel_seqs) self.model.menu_statuses.can_move_sequence = can_move_seqs
[docs] @QtCore.pyqtSlot() def updateSetAsRefEnabled(self): """ Enable/disable the menu model for Set As Reference Sequence """ aln = self.getAlignment() seqs = aln.getSelectedSequences() one_seq_selected = len(seqs) == 1 ref_seq = aln.getReferenceSeq() can_set_ref = one_seq_selected and seqs[0] != ref_seq self.model.menu_statuses.can_set_as_ref = can_set_ref self.model.menu_statuses.can_duplicate_as_ref = can_set_ref and not self.isWorkspace( )
[docs] @QtCore.pyqtSlot() def updateRenameSeqEnabled(self): aln = self.getAlignment() sel_seqs = aln.getSelectedSequences() self.model.menu_statuses.can_rename_seq = (len(sel_seqs) == 1)
[docs] def updateDuplicateSeqOptionsEnabled(self): aln = self.getAlignment() can_duplicate_sequence = aln.seq_selection_model.hasSelection() menu_statuses = self.model.menu_statuses menu_statuses.can_duplicate_sequence = can_duplicate_sequence menu_statuses.can_duplicate_seq_same_tab = (not self.isWorkspace() and can_duplicate_sequence)
@QtCore.pyqtSlot() def _updateRenumberResEnabled(self): """ Enables the 'Renumber Residues...' menu-item only when there is a sequnce in the MSV workspace. """ can_renumber_residues = bool(self.getAlignment()) self.model.menu_statuses.can_renumber_residues = can_renumber_residues @QtCore.pyqtSlot() def _updateEditSequenceAsTextEnabled(self): """ Enables the 'Edit Sequence as Plain Text...' menu-item only when there is exactly one sequence selected in the MSV View and it is unlinked and not in an aln set. """ aln = self.getAlignment() sel_seqs = aln.seq_selection_model.getSelection() if self.isWorkspace() or len(sel_seqs) != 1: can_edit_sequence = False else: sel_seq = sel_seqs.pop() can_edit_sequence = (not sel_seq.hasStructure() and aln.alnSetForSeq(sel_seq) is None) self.model.menu_statuses.can_edit_as_text = can_edit_sequence
[docs] @QtCore.pyqtSlot() def updateSelectMenuEnabled(self): can_select = bool(self.getAlignment()) self.model.menu_statuses.can_select = can_select
[docs] @QtCore.pyqtSlot() def updateSelectProteinInterfaceMenuEnabled(self): aln = self.getAlignment() can_select = bool(aln) if can_select and not self.isWorkspace(): can_select = any(seq.hasStructure() for seq in aln) self.model.menu_statuses.can_select_protein_interface = can_select
[docs] @QtCore.pyqtSlot() def updateExpandEnabled(self): can_expand = (self.options_model.group_by is not viewconstants.GroupBy.Type) self.model.menu_statuses.can_expand = can_expand
[docs] @QtCore.pyqtSlot() def updateAnchorSelectionEnabled(self): aln = self.getAlignment() selected_res = aln.res_selection_model.getSelection() anchored_res = aln.getAnchoredResiduesWithRef() sel_anchored_res = selected_res.intersection(anchored_res) all_anchored = (len(sel_anchored_res) == len(selected_res)) can_anchor_sel = (bool(len(selected_res)) and aln._anchorSelectionValid() and not all_anchored) self.model.menu_statuses.can_anchor_res = can_anchor_sel self.model.menu_statuses.can_unanchor_res = bool(sel_anchored_res)
[docs] @QtCore.pyqtSlot() def updateRemoveHighlightsEnabled(self): aln = self.getAlignment() highlight_map = aln.getHighlightColorMap() sel_res = aln.res_selection_model.getSelection() can_remove_highlights = any(res in highlight_map for res in sel_res) self.model.menu_statuses.can_remove_highlights = can_remove_highlights
[docs] def updateHideColumnsEnabled(self): # TODO hiding isn't enabled yet # Enable if only full columns are selected pass
[docs] @QtCore.pyqtSlot() def updateAlignSelectedSeqsEnabled(self): aln = self.getAlignment() selected_seqs = aln.seq_selection_model.getSelection() ref_seq = aln.getReferenceSeq() selected_seqs.discard(ref_seq) enable = len(aln) > 1 and len(selected_seqs) > 0 self.model.menu_statuses.can_align_selected_seqs = enable
@QtCore.pyqtSlot() def _updateClearConstraintsEnabled(self): aln = self.getAlignment() enable = aln.pairwise_constraints.hasConstraints() self.model.menu_statuses.can_clear_constraints = enable @QtCore.pyqtSlot() def _onAlnSetChanged(self): self._updateAlnSets() self._updateRemoveFromAlnSetEnabled() self._updateAlignAlnSetEnabled() self._updateResEditStatuses() self._updateEditSequenceAsTextEnabled() @QtCore.pyqtSlot() def _updateAlignAlnSetEnabled(self): can_only_multiple_align = False can_only_profile_align = False can_aln_set_align = True aln_mode = validate_align.get_aln_set_align_mode(self.getAlignment()) if aln_mode is viewconstants.SeqAlnMode.Multiple: can_only_multiple_align = True elif aln_mode is viewconstants.SeqAlnMode.Profile: can_only_profile_align = True elif not aln_mode: can_aln_set_align = False statuses = self.model.menu_statuses statuses.can_only_multiple_align = can_only_multiple_align statuses.can_only_profile_align = can_only_profile_align statuses.can_aln_set_align = can_aln_set_align @QtCore.pyqtSlot() def _updateRemoveFromAlnSetEnabled(self): aln = self.getAlignment() selected_seqs = aln.seq_selection_model.getSelection() enable = any(aln.alnSetForSeq(seq) is not None for seq in selected_seqs) self.model.menu_statuses.can_remove_from_aln_set = enable def _updateAlnSets(self): """ Update the list of alignment set names. This list is used to populate "Alignment Set -> Add to Set" in the sequence context menu. """ set_names = self.getAlignment().alnSetNames() if set_names != self.model.menu_statuses.aln_set_names: self.model.menu_statuses.aln_set_names = set_names @QtCore.pyqtSlot() def _updateCanAlignBindingSites(self): """ Updates whether a binding site alignment can be run. Residues must be selected only on the reference structure and at least 3 residues must be selected. """ aln = self.getAlignment() all_selected_res = aln.res_selection_model.getSelection() if len(all_selected_res) < 3: can_align = False else: ref_seq = aln.getReferenceSeq() selected_res_in_ref_seq = { res for res in all_selected_res if res.sequence == ref_seq } can_align = all_selected_res == selected_res_in_ref_seq self.model.menu_statuses.can_align_binding_site = can_align # ========================================================================== # Helpers # ========================================================================== def _getNewAlignment(self): """ Return an alignment appropriate for the widget (ProteinAlignment, etc.) with a reference to the widget's undo stack set on it. :return: `schrodinger.application.msv.alignment.Alignment` """ msg = ("Subclasses of AbstractMsvWidget must return an appropriate " "alignment here.") raise NotImplementedError(msg) def _getAlignmentView(self, undo_stack): """ Return an alignment view appropriate for the widget. :param is_workspace: Whether this widget represents the workspace. :type is_workspace: bool :param undo_stack: The undo stack to pass to the view. :type undo_stack: schrodinger.application.msv.command.UndoStack :rtype: view.AbstractAlignmentView """ msg = ("Subclasses of AbstractMsvWidget must return an appropriate " "alignment view here.") raise NotImplementedError(msg) def _getNoScrollBarView(self): """ Return an alignment view without scroll bars """ msg = ("Subclasses of AbstractMsvWidget must return an appropriate " "no scrollbar alignment view here.") raise NotImplementedError(msg) def _getAlignmentInfoView(self): """ Return an alignment view for the alignment info columns on the left side """ msg = ("Subclasses of AbstractMsvWidget must return an appropriate " "alignment info view here.") raise NotImplementedError(msg) def _getAlignmentMetricsView(self): """ Return an alignment view for the "frozen" metrics columns on the right side """ msg = ("Subclasses of AbstractMsvWidget must return an appropriate " "alignment metrics view here.") raise NotImplementedError(msg) def _getAlignmentViewModel(self): """ Return an alignment viewmodel appropriate for the widget. """ msg = ("Subclasses of AbstractMsvWidget must return an appropriate " "alignment viewmodel here.") raise NotImplementedError(msg) # ========================================================================== # External API # ==========================================================================
[docs] @qt_utils.wait_cursor def generatePfam(self): aln = self.getAlignment() if aln.seq_selection_model.hasSelection(): seqs = aln.seq_selection_model.getSelection() else: seqs = list(aln) if self._pfam_task_queue is None: task_queue = MsvTaskQueue(description="Finding Pfam sequences...", max_running_tasks=1) task_queue.queueDone.connect(self._onPfamFinished) self._pfam_task_queue = task_queue for seq in seqs: task = pfam.PfamTask() task.input.seq = seq self._pfam_task_queue.addTask(task) self.taskStarted.emit(self._pfam_task_queue) if self._pfam_task_queue.status != task.RUNNING: self._pfam_task_queue.start()
@QtCore.pyqtSlot() def _onPfamFinished(self): num_failed = 0 for task in self._pfam_task_queue.getTasks(): if task.status is task.DONE: task.input.seq.setPfam(task.output.pfam, task.output.name) elif task.status is task.FAILED: num_failed += 1 self._pfam_task_queue = None if num_failed: p = inflect.engine() self.warning(f"Pfam failed for {p.no('sequence', num_failed)}") self.model.options.predicted_annotations.add(SEQ_ANNO_TYPES.pfam)
[docs] def generatePredictionAnnotation(self, anno): """ Generate predictions for the given prediction annotation. `anno` should be a key in `PRED_ANNO_TO_PRED_FUNC`. """ self._runPredictionsForSelectedSeqs([anno])
[docs] def generateAllPredictions(self): annos = list(PRED_ANNO_TO_PRED_FUNC) self._runPredictionsForSelectedSeqs(annos)
def _runPredictionsForSelectedSeqs(self, annos): selected_seqs = self.model.split_aln.getSelectedSequences() if not selected_seqs: self.warning("Select a sequence to generate predictions for.") return for seq in selected_seqs: self._runPredictionsForSeq(seq, annos) def _runPredictionsForSeq(self, seq, annos): blast_task = self._runBlastForPredictions(seq) if blast_task.status is not blast_task.DONE: return blast_aln = blast_task.getBlastAlignment() for anno in annos: if (anno is SEQ_ANNO_TYPES.pred_disulfide_bonds and sum(str(r) == 'C' for r in seq.residues()) < 2): continue pred_task = predictors.PredictorWrapperTask(anno, seq, blast_aln) self.taskStarted.emit(pred_task) pred_task.start() pred_task.wait() # TODO PANEL-18317 self.model.options.predicted_annotations.add(anno) def _runBlastForPredictions(self, seq, location=blast.LOCAL): blast_task = blast.BlastTask() blast_task.input.settings.location = location blast_task.input.query_sequence = seq self.startPredictorBlastTaskRequested.emit(blast_task) return blast_task
[docs] def undo(self): """ Call undo on the undo stack """ self.undo_stack.undo()
[docs] def redo(self): """ Call redo on the undo stack """ self.undo_stack.redo()
[docs] def resetUndoStack(self): """ Reset the undo stack This needs to be called when we perform an operation that gets tangled up with Maestro's single step undo, so that permitting an undo from msv would result in data corruption. """ self.undo_stack.clear()
[docs] def anchorSelection(self): """ Anchors all unanchored residues in the selection """ self.view.anchorSelection()
[docs] def clearAnchored(self): """ Removes anchors from all anchored residues in the selection """ self.view.unanchorSelection()
[docs] def getAlignment(self): """ Returns the widget's alignment instance :rtype: `schrodinger.protein.alignment.BaseAlignment` :return: The widget's alignment """ return self.model.aln
[docs] def importSeq(self, seq, index=None): """ Adds a given sequence to the widget's alignment and align if the widget already had one or more sequences :param seq: The sequence to add :type seq: schrodinger.protein.sequence.ProteinSequence :param index: Index to import the seq at :type index: int or None """ self.importSeqs([seq], index)
[docs] def importSeqs(self, seqs, index=None, replace_selection=True): """ Add sequences to the widget's alignment and align if the widget already had one or more sequences. .. WARNING:: Avoid calling this method from unittests as it adds a maestro test-time dependency :param seqs: Sequences to add :type seqs: iterable(schrodinger.protein.sequence.ProteinSequence) :param index: Index to import the seqs at (must be 1 or greater) :type index: int or None """ if self.isWorkspace(): raise RuntimeError("Cannot add sequences to the workspace widget. " "Add entries to the workspace instead.") if index == 0: raise RuntimeError("Cannot import sequence as the reference.") aln = self.getAlignment() prev_n_seqs = len(aln) seqs = list(seqs) aln.addSeqs(seqs, index=index, replace_selection=replace_selection) if self.options_model.auto_align: if not self.model.split_chain_view: # In combined-chain mode, we need to get the new combined-chain # sequences from the alignment since seqs contains split-chain # sequences seqs = aln[prev_n_seqs:] if not seqs: # If all of the chains got added to existing combined-chain # sequences, then there's nothing to align return n_new_seqs = len(seqs) had_ref = bool(prev_n_seqs) if had_ref: should_align = True if n_new_seqs >= 20: dlg = dialogs.TooManyAutoAlignDialog(n_new_seqs, self) should_align = dlg.exec() if should_align: self._alignNewSeqs(seqs, pairwise=prev_n_seqs > 1)
def _alignNewSeqs(self, new_seqs, pairwise=True): """ :param pairwise: Whether to pairwise align each new sequence to the reference sequence or multiple align the entire alignment :type pairwise: bool """ aln = self.getAlignment() n_seqs = len(aln) if n_seqs < 2: raise RuntimeError("Can't align fewer than 2 seqs") with aln.seq_selection_model.suspendSelection(), \ aln.res_selection_model.suspendSelection(): if pairwise: self.setSelectedSequences(new_seqs) undo_desc = ("(Auto) Pairwise Alignment with Locked Reference " " Gaps") with command.compress_command(self.undo_stack, undo_desc): self.mergedPairwiseAlignment() aln.minimizeAlignment() else: undo_desc = ("(Auto) Multiple Alignment") with command.compress_command(self.undo_stack, undo_desc): self.multipleAlignment() def _syncWsResSelectionForSeqs(self, seqs): """ Synchronize residue selection with the workspace for sequences that were just added to the alignment. :param seqs: The sequences to select residues in :type seqs: Iterable(sequence.Sequence) """ if self.isWorkspace(): # Residue selection synchronization for the workspace tab is # handled inside the structure model. # This early return also prevents a crash due to MAE-44891. return aln = self.getAlignment() self._structure_model.applyWorkspaceSelectionToSeqs(aln, seqs) def _checkCanDeleteSeqs(self) -> bool: aln = self.getAlignment() sel_seqs = aln.getSelectedSequences() ref_seq = aln.getReferenceSeq() if ref_seq in sel_seqs: if len(sel_seqs) == len(aln): return self.question("Delete all sequences?") else: anchor_text = "" if aln.getAnchoredResidues(): anchor_text = "remove all anchors and " return self.question( f"Deleting the Reference sequence will {anchor_text}cause " "the first available sequence to become the Reference. " "Continue anyway?", title="Reference Sequence Selected") return True
[docs] @QtCore.pyqtSlot() def removeSelectedSeqs(self): """ Remove the currently selected sequences from the alignment. """ aln = self.getAlignment() sel_seqs = aln.getSelectedSequences() if not sel_seqs: return self.removeSequences(sel_seqs)
[docs] def removeSequences(self, seqs): """ Remove the specified sequences. Displays the new reference sequence in case it is hidden. :param seqs: List of sequences to be removed :type seqs: list(sequence.Sequence) """ if self.isWorkspace(): raise RuntimeError("Cannot remove sequences from the workspace " "widget. Remove entries from the workspace " "instead.") aln = self.getAlignment() ref_seq = aln.getReferenceSeq() if ref_seq in seqs: if self._checkCanDeleteSeqs(): n_seqs = len(seqs) if n_seqs == len(aln): desc = "Remove All Sequences" elif n_seqs == 1: desc = "Remove Reference Sequence" else: n_others = n_seqs - 1 other_text = inflect.engine().plural("Other", n_others) desc = "Remove Reference Sequence and " \ f"{n_others} {other_text}" with command.compress_command(self.undo_stack, desc): # Unhide the next available reference sequence, # if it is hidden. if aln.anyHidden(): nonref_seqs = itertools.islice(aln, 1, None) new_ref_seq = next( (seq for seq in nonref_seqs if seq not in seqs), None) if (new_ref_seq is not None and aln.isSeqHidden(new_ref_seq)): aln.showSeqs([new_ref_seq]) aln.clearAnchors() aln.removeSeqs(seqs) else: aln.removeSeqs(seqs)
def _checkCanSetReferenceSeq(self) -> bool: """ If there are anchors, prompt user before changing reference seq """ if not self.getAlignment().getAnchoredResidues(): return True return self.question( "Changing the Reference sequence will remove all anchors. " "Continue anyway?", title="Anchors Present")
[docs] @QtCore.pyqtSlot() def setSelectedSeqAsReference(self): """ Set the selected sequence as the reference sequence. If more than one sequence is selected, no reference sequence will be set. Restore the expanded state of each sequence afterwards, because it will be reset after the reordering of the rows. """ sel_seqs = self.getSelectedSequences() if len(sel_seqs) != 1 or self.getAlignment().isReferenceSeq( sel_seqs[0]): return if self._checkCanSetReferenceSeq(): new_ref_seq = sel_seqs[0] desc = f"Set {new_ref_seq.fullname} as Reference Sequence" with command.compress_command(self.undo_stack, desc): self.getAlignment().clearAnchors() self._setReferenceSeq(new_ref_seq)
def _setReferenceSeq(self, seq): """ Set the specified sequence as the reference sequence. Restore the expanded state of each sequence afterwards, because it will be reset after the reordering of the rows. :param seq: The sequence to set as the reference seq :type seq: schrodinger.protein.sequence.ProteinSequence """ self.getAlignment().setReferenceSeq(seq)
[docs] @QtCore.pyqtSlot() def selectResForSeqs(self): """ Select all residues in the alignment for the selected sequences """ self.getAlignment().setResSelectionStateForSelectedSeqs(True)
[docs] @QtCore.pyqtSlot() def deselectResForSeqs(self): """ Clears the selection of residues for the selected sequences. """ self.getAlignment().setResSelectionStateForSelectedSeqs(False)
[docs] @QtCore.pyqtSlot() def blastSelectedSeq(self): """ Set selected sequence as reference sequence and open BLAST search dialog. """ self.setSelectedSeqAsReference() self.openBlastSearchDialog()
[docs] def expandSelectionAlongCols(self): """ Expand selection based on current residue selection to include all residues in their columns. """ self.getAlignment().expandSelectionAlongColumns()
[docs] def expandSelectionReferenceOnly(self): """ Select column residues for all columns that have a non-gap reference """ self.getAlignment().expandSelectionFromReference()
[docs] def selectIdentityColumns(self): """ Select the residues in the identity columns of the alignment """ self.getAlignment().selectIdentityColumns()
[docs] def selectAlignedResidues(self): """ Selects residues in columns containing no gaps. """ self.getAlignment().selectAlignedResidues()
[docs] def openBlastSearchDialog(self): """ Implement sequence-type-specific BLAST dialog in child classes """
[docs] @QtCore.pyqtSlot(object) def moveSelectedSequences(self, direction): """ Move the currently selected sequences in the specified direction. Will prompt the user if there are hidden seqs and the direction is Up or Down. :param direction: Direction to move items. :type direction: viewconstants.Direction :raise ValueError: Invalid value of `direction`. :raise IndexError: `seq_list` is not a subset of self. """ if direction in (viewconstants.Direction.Down, viewconstants.Direction.Up): aln = self.getAlignment() hidden_seqs = dialogs.prompt_for_hidden_seqs(self, aln) if hidden_seqs: return self._moveSequences(self.getSelectedSequences(), direction)
def _moveSequences(self, seq_list, direction): aln = self.getAlignment() if len(aln) == 0: return non_query_items = list(range(1, len(aln))) seq_indices = ( aln.index(seq) for seq in seq_list if not aln.isReferenceSeq(seq)) new_seq_ordering = self.moveItems(all_items=non_query_items, sel_items=sorted(seq_indices), direction=direction) # Add query back to first position new_seq_ordering = [0] + new_seq_ordering aln.reorderSequences(new_seq_ordering)
[docs] @classmethod def moveItems(cls, all_items, sel_items, direction): """ Move the specified items in the specified direction. :param all_items: List of unique items. :type all_items: list :param sel_items: Items to move; must be a subset of `all_items`. :type sel_items: list :param direction: Direction to move items. :type direction: viewconstants.Direction :return: Reordered items :rtype: list :raise ValueError: Invalid value of `direction` or `sel_items` is not a subset of `all_items`. """ all_items = list(all_items) # Copy to avoid modifying original sel_items = list(sel_items) # Cast to list for comparison all_item_set = set(all_items) if len(all_item_set) != len(all_items): raise ValueError("`all_items` is not unique") sel_item_set = set(sel_items) if len(sel_item_set) != len(sel_items): raise ValueError("`sel_items` is not unique") if not sel_item_set.issubset(all_item_set): raise ValueError("`sel_items` is not a subset of `all_items`") DIRECTION = viewconstants.Direction last_idx = len(all_items) - 1 # Exit early if sel_items are already at the top or bottom if ((direction is DIRECTION.Top or direction is DIRECTION.Up) and all_items[:len(sel_items)] == sel_items) or ( (direction is DIRECTION.Bottom or direction is DIRECTION.Down) and all_items[-len(sel_items):] == sel_items): return all_items if direction is DIRECTION.Top: iterable = reversed(sel_items) idx_func = lambda idx: 0 elif direction is DIRECTION.Up: iterable = sel_items idx_func = lambda idx: max(0, idx - 1) elif direction is DIRECTION.Down: iterable = reversed(sel_items) idx_func = lambda idx: min(last_idx, idx + 1) elif direction is DIRECTION.Bottom: iterable = sel_items idx_func = lambda idx: last_idx else: raise ValueError("Invalid value for 'direction'") for itm in iterable: idx = all_items.index(itm) new_idx = idx_func(idx) if new_idx != idx: all_items.insert(new_idx, all_items.pop(idx)) return all_items
[docs] def getColorScheme(self): return self._table_model.getSeqColorScheme()
@QtCore.pyqtSlot(object) def _onResidueMiddleClicked(self, res): """ Fit to clicked residue if MSV is opened from Maestro. :param res: clicked residue :type res: protein.residue.Residue """ if maestro: self._structure_model.onResidueMiddleClicked(res)
[docs] def getShownRowTypes(self): """ Returns a list of enums representing the shown row types :rtype: list :return: A list of enums """ return self._table_model.getShownRowTypes()
[docs] def rowWrap(self): """ Indicates whether rows are currently wrapped :rtype: bool :return: Whether the rows are currently wrapped """ return self._table_model.rowWrap()
[docs] def groupedByType(self): """ Indicates whether rows are grouped by type :rtype: bool :return: Whether the rows are currently grouped by type """ by_ann = viewconstants.GroupBy.Type return by_ann is self._table_model.getGroupBy()
[docs] @QtCore.pyqtSlot() def updateColorsEnabled(self): """ Set whether to color all sequences from the options model """ colors_enabled = self.options_model.colors_enabled if colors_enabled: self.updateSeqColorScheme() else: no_colors = color.NoColorScheme() self.applyColorScheme(no_colors)
[docs] @QtCore.pyqtSlot() def updateSeqColorScheme(self): """ Set the sequence color scheme from the options model. """ scheme = self.options_model.seq_color_scheme self.applyColorScheme(scheme) if isinstance(scheme, color.WorkspaceScheme): self.pullWorkspaceColors()
[docs] def showColorSchemeEditor(self): # make sure that colors are enabled so the user can see the colors as # they edit them and so that we don't send NoColorScheme to the color # editor dialog. if not self.options_model.colors_enabled: self.options_model.colors_enabled = True seq_scheme = self.getColorScheme() color_editor = dialogs.ColorEditor(seq_scheme, self.model.aln, parent=self) color_editor.schemeChanged.connect(self._previewColorScheme) ok_pressed = color_editor.exec() if ok_pressed: self.applyColorScheme(color_editor.getCurrentScheme()) else: self._restoreColorScheme()
[docs] def setSequenceExpansionState(self, sequences=None, expand=True): """ Set the expansion state for the given (or all shown) sequences. :param sequences: Sequences to expand or collapse. If None, use all shown sequences. :type sequences: list(Sequence) :param expand: Whether to expand the sequences :type expand: bool """ if sequences is None: aln = self.getAlignment() if aln.anyHidden(): sequences = aln.getShownSeqs() else: func = self.view.expandAll if expand else self.view.collapseAll func() return self.view.setSequenceExpansionState(sequences, expand)
[docs] @QtCore.pyqtSlot() def translateSelectedSequences(self): """ Translate the selected nucleic acid sequences """ sel_seqs = self.getSelectedSequences() if not sel_seqs: return if self.isWorkspace(): self.translateIntoNewTabRequested.emit(sel_seqs) return self.translateSeqsInPlace(sel_seqs)
[docs] def translateSeqsInPlace(self, seqs): """ Translate the given sequences in place. :param seqs: The sequencs to translate :type seqs: iterable[sequence.NucleicAcidSequence] """ msg = "Cannot translate in place on Workspace Tab" assert not self.isWorkspace(), msg seqs = [ seq for seq in seqs if isinstance(seq, sequence.NucleicAcidSequence) ] if not seqs: return aln = self.getAlignment() desc = "Translate RNA/DNA Sequences" with command.compress_command(self.undo_stack, desc): for source_seq in seqs: if not isinstance(source_seq, sequence.NucleicAcidSequence): continue seq_idx = aln.index(source_seq) protein_seq = source_seq.getTranslation() self.importSeq(protein_seq, seq_idx + 1) self.removeSequences(seqs)
# ========================================================================== # Command Interface # ========================================================================== # The role of the methods in this section is to create and return commands. # The main work here is in gathering the parameters with which to create # the command objects. Actual work is performed elsewhere when the command # object calls the gui with the appropriate parameters. # Operations that modify sequences and alignments do NOT need to be # performed within a command, because they use the widget's own command # stack directly.
[docs] @util.skip_if("_changing_options_model") @command.do_command() def applyColorScheme(self, scheme): """ Set the sequence row color scheme. :param scheme: The color scheme to be applied. :type: color.AbstractRowColorScheme """ if self._curr_color_scheme is not None: original_scheme = self._curr_color_scheme else: original_scheme = self.getColorScheme() redo = partial(self._applyColorScheme, scheme) undo = partial(self._applyColorScheme, original_scheme) desc = "Apply Color Scheme" return redo, undo, desc
[docs] def setSelectedResColor(self, color): """ Set the highlight color of the selected residues and enable colors """ self.options_model.colors_enabled = True self.getAlignment().setSelectedResColor(color)
[docs] def clearAllHighlights(self): self.options_model.colors_enabled = True self.getAlignment().clearAllHighlights()
[docs] @QtCore.pyqtSlot() def updateRowWrap(self): """ Set the row wrapping on the widget """ self._table_model.setRowWrap(self.options_model.wrap_sequences) self.h_scrollbar.setVisible(not self.options_model.wrap_sequences)
[docs] @prompt_if_seqs_hidden @QtCore.pyqtSlot(object) @QtCore.pyqtSlot(object, bool) def sortBy(self, sort_by, reverse=False): """ Sort the alignment by the specified criteria. :param sort_by: The criterion to sort on :type sort_by: viewconstants.SortTypes :param reverse: Whether to sort in reverse (descending) order. :type order: bool """ metric_funcs = { viewconstants.SortTypes.Identity: 'getIdentity', viewconstants.SortTypes.Similarity: 'getSimilarity', viewconstants.SortTypes.Conservation: 'getConservation', viewconstants.SortTypes.Score: 'getSimilarityScore', } def get_metric_func(sort_by, reference, consider_gaps): def metric_func(seq): func_name = metric_funcs[sort_by] func = getattr(seq, func_name) return func(reference, consider_gaps=consider_gaps) return metric_func sorts_without_ref = { viewconstants.SortTypes.Name: lambda seq: seq.name, viewconstants.SortTypes.ChainID: lambda seq: seq.chain, viewconstants.SortTypes.NumGaps: lambda seq: len(seq.getGaps()), viewconstants.SortTypes.Length: lambda seq: len(seq), } aln = self.getAlignment() if aln is not None: if isinstance(sort_by, properties.SequenceProperty): aln.sortByProperty(sort_by, reverse=reverse) elif sort_by in metric_funcs: reference_seq = self.getReferenceSeq() consider_gaps = self.options_model.include_gaps sort_func = get_metric_func(sort_by, reference_seq, consider_gaps) aln.sort(key=sort_func, reverse=reverse) else: aln.sort(key=sorts_without_ref[sort_by], reverse=reverse)
# ========================================================================== # Command Implementations (only called from within commands) # # ========================================================================== @command.from_command_only def _applyColorScheme(self, scheme): """ Apply a new color scheme. Automatically discards any color scheme that is currently being previewed. """ self._restoreColorScheme() self._table_model.updateColorScheme(viewconstants.RowType.Sequence, scheme) # Update the appropriate color scheme in the options model. The # options model change will trigger the command again, so we use # a context manager to avoid that. with self._changingOptionsModel(): if scheme.custom: self.options_model.custom_color_scheme = scheme if isinstance(scheme, color.NoColorScheme): self.options_model.colors_enabled = False else: self.options_model.colors_enabled = True self.options_model.seq_color_scheme = scheme @QtCore.pyqtSlot(color.AbstractRowColorScheme) def _previewColorScheme(self, scheme): """ Set a color scheme for sequences without adding it to the undo stack. Calling `_restoreColorScheme` will remove the preview. """ if self._curr_color_scheme is None: self._curr_color_scheme = self.getColorScheme() self._table_model.updateColorScheme(viewconstants.RowType.Sequence, scheme) def _restoreColorScheme(self): """ Remove any color scheme that's currently being previewed. """ if self._curr_color_scheme is None: return scheme = self._curr_color_scheme self._table_model.updateColorScheme(viewconstants.RowType.Sequence, scheme) self._curr_color_scheme = None @QtCore.pyqtSlot() def _updateSeqStatusModel(self): """ Update all parts of status model """ aln = self.getAlignment() self.seq_status_model.num_total = len(aln) self.seq_status_model.num_hidden = sum( 1 for shown in aln.getSeqShownStates() if not shown) self._updateColorScheme() self._updateStatusModelRefSeq() self._updateStatusModelNumSelected() self._updateStatusModelNumStructs() @QtCore.pyqtSlot() def _updateCanDeleteAllPredictions(self): aln = self.getAlignment() model = self.model def has_prediction(seq): return any([ seq.hasDisorderedRegionsPredictions(), seq.hasDisulfideBondPredictions(), seq.hasDomainArrangementPredictions(), seq.hasSolventAccessibility(), seq.hasSSAPredictions() ]) model.menu_statuses.can_delete_predictions = any( has_prediction(seq) for seq in aln) @QtCore.pyqtSlot() def _updateColorScheme(self): """ Called when the widget's alignment length changes. """ color_scheme = self.getColorScheme() if isinstance(color_scheme, color.PositionScheme): max_len = self.getAlignment().num_columns color_scheme.setLength(max_len) if isinstance(color_scheme, (color.PositionScheme, color.SimilarityScheme)): # We are working with a copy of the original scheme, so we # must re-apply it here. self.applyColorScheme(color_scheme) @QtCore.pyqtSlot() def _updateStatusModelRefSeq(self): ref_seq = self.getReferenceSeq() name = format_seq_name_with_chain(ref_seq, self.model.split_chain_view) self.seq_status_model.ref_seq = name @QtCore.pyqtSlot() def _updateStatusModelNumSelected(self): self.seq_status_model.num_selected = len(self.getSelectedSequences()) @QtCore.pyqtSlot() def _updateStatusModelNumStructs(self): seq_counts = self.getAlignment().getWorkspaceCounts() num_in_ws = sum( seq_counts[visibility] for visibility, included in viewconstants.included_map.items() if included) num_total = sum(count for visibility, count in seq_counts.items() if visibility is not None) self.seq_status_model.num_structs.num_in_ws = num_in_ws self.seq_status_model.num_structs.num_total = num_total @QtCore.pyqtSlot() def _updateResStatusAndEditStatus(self): """ Update residue statuses and the edit toolbar. Should be called when residue selection changes or the alignment changes in ways that affect the shape of the residue selection (e.g. sequences reordered, split chain toggled) """ self._updateResStatusModel() self._updateResEditStatuses() # Uses res status model def _updateResStatusModel(self): """ Store numbers of residues and blocks selected in the res status model """ aln = self.getAlignment() res_sel_mdl = aln.res_selection_model selected_elements = res_sel_mdl.getSelection() if not selected_elements: self.res_status_model.reset() return n_res_selected = 0 n_gaps_selected = 0 seqs_with_selected_elems = set() for elem in selected_elements: seqs_with_selected_elems.add(elem.sequence) if elem.is_gap: n_gaps_selected += 1 else: n_res_selected += 1 status_model = self.res_status_model status_model.num_res_selected = n_res_selected status_model.num_gaps_selected = n_gaps_selected status_model.num_seqs_with_selected_elems = len( seqs_with_selected_elems) status_model.num_blocks_selected = res_sel_mdl.numBlocksSelected() def _updateResEditStatuses(self): """ Set menu statuses related to editing residues. Uses data set on `self.res_status_model`. """ statuses = self._getResEditStatuses() for key, value in statuses.items(): setattr(self.model.menu_statuses, key, value) def _getResEditStatuses(self): """ Return a dict of statuses related to editing residues. Uses data set on `self.res_status_model`. """ aln = self.getAlignment() res_sel_mdl = aln.res_selection_model statuses = { 'can_copy_residues': False, 'can_delete_gaps': False, 'can_insert_gap': False, 'can_insert_residues': False, 'can_delete_residues': False, 'can_replace_res_with_gaps': False, 'can_replace_selected_elems': False, 'can_change_elem': False, } has_selection = res_sel_mdl.hasSelection() if not has_selection: # If no residues are selected, can't do any edit actions return statuses res_status_mdl = self.res_status_model single_block_single_seq = (res_status_mdl.num_blocks_selected == 1 and res_status_mdl.num_seqs_with_selected_elems == 1) # Can always copy selected residues statuses['can_copy_residues'] = single_block_single_seq if aln.alnSetResSelected(): # Aln sets can't be edited (even to insert gaps) return statuses statuses['can_delete_gaps'] = res_status_mdl.num_gaps_selected > 0 statuses['can_insert_gap'] = has_selection if self.isWorkspace(): # On the workspace tab, can only edit gaps return statuses statuses['can_insert_residues'] = single_block_single_seq # Structured residues can't be edited structured = res_sel_mdl.anyStructuredResiduesSelected() can_delete_residues = not structured statuses['can_delete_residues'] = can_delete_residues statuses['can_replace_res_with_gaps'] = can_delete_residues statuses['can_replace_selected_elems'] = (not structured and single_block_single_seq) single_res = (res_status_mdl.num_res_selected + res_status_mdl.num_gaps_selected == 1) statuses['can_change_elem'] = single_res and not structured return statuses @QtCore.pyqtSlot(bool) def _updateResOutlineState(self, enabled): self.view.setResOutlinesShown(enabled)
[docs] def saveImage(self): """ Saves multiple sequence alignment as an image file, by default, exports image of the entire alignment. """ if self._table_model.export_model.rowCount() == 0: self.warning(title="No Alignment", text="Alignment is currently empty.") return accepted = self.image_export_dialog.exec() if not accepted: return model = self.image_export_dialog.model logo_was_on = (ALN_ANNO_TYPES.sequence_logo in self._table_model.getShownRowTypes()) disconnect_after_print = False visible_scrollbar = self.v_scrollbar.isVisible() if model.export_type == model.ExportTypes.ENTIRE_ALN: disconnect_after_print = True view_to_print, tmp_info, tmp_metrics = self._createNoScrollView() elif model.export_type == model.ExportTypes.VISIBLE_ALN: if visible_scrollbar: self.v_scrollbar.setVisible(False) # Needed to resize view after hiding scrollbar application.process_events() view_to_print = self else: # sequence logo self._table_model._setVisibilityForRowType( ALN_ANNO_TYPES.sequence_logo, True) view_to_print = self.getLogoView() renderer = get_svg_renderer(view_to_print) if visible_scrollbar: self.v_scrollbar.setVisible(True) # the following makes sure ext is always in lower case. file_name = self.image_export_dialog.selectedFile() name, _ = os.path.splitext(file_name) file_name = f"{name}.{model.format.name.lower()}" if model.format == model.Format.PNG: save_png(renderer, file_name, model.dpi) elif model.format == model.Format.PDF: save_pdf(renderer, file_name) else: raise RuntimeError( 'Attempting to export to an unsupported file format: %s.' % (model.format.name.lower())) self._table_model._setVisibilityForRowType(ALN_ANNO_TYPES.sequence_logo, logo_was_on) # MSV-1883, MSV-1884 - The python wrappers for these widgets do not # get fully garbage collected when the signals remain connected, so # we must manually disconnect them here. if disconnect_after_print: for _view in [tmp_info, tmp_metrics]: _view.setModel(None)
[docs] def getLogoView(self): """ Return an alignment view which contains the rulers and sequence logo. :return: the logo view :rtype: view.LogoAlignmentView """ model = viewmodel.ExportLogoProxyModel() model.setSourceModel(self._table_model.top_model) logo_view = view.LogoAlignmentView(self) logo_view.setLightMode(self._light_mode) logo_view.setModel(model) logo_view.setFrameStyle(QtWidgets.QFrame.NoFrame) return logo_view
def _createNoScrollView(self): """ Copy the views (we can't use the originals because views can't be used in 2 or more layouts at the same time) :return: 3-tuple of the no-scroll view widget, the copied info view and the copied metrics view. :rtype: tuple(view.NoScrollAlignmentView, view.AlignmentInfoView, view.AlignmentMetricsView) """ noscroll_view = self._getNoScrollAlignmentView() noscroll_view.setModel(self._table_model.export_model) noscroll_view.setFrameStyle(QtWidgets.QFrame.NoFrame) metrics = copy.copy(self.aln_metrics_view) info = copy.copy(self.aln_info_view) info.setMinimumWidth(self.aln_info_view.width()) views = [info, noscroll_view, metrics] # Create the layout and add the views noscroll_view_widget = QtWidgets.QWidget(self) # give same object name as self.view_widget so it inherits style noscroll_view_widget.setObjectName("view_widget") noscroll_view_widget.setLayout(QtWidgets.QHBoxLayout()) noscroll_view_widget.layout().setContentsMargins(0, 0, 0, 0) noscroll_view_widget.layout().setSpacing(0) noscroll_view_widget.setSizePolicy(QtWidgets.QSizePolicy.Ignored, QtWidgets.QSizePolicy.Ignored) for v in views: v.setLightMode(self._light_mode) noscroll_view_widget.layout().addWidget(v) # Calculate the dimensions needed to fit all the views # Use the original info view width since the copy hasn't had time # to update. layout_w = (self.aln_info_view.size().width() + noscroll_view.size().width() + metrics.size().width()) layout_h = max(noscroll_view.size().height(), metrics.size().height()) layout_h += 10 # give a little extra padding to bottom noscroll_view_widget.resize(layout_w, layout_h) # Force resize event noscroll_view_widget.layout().update() noscroll_view_widget.layout().activate() return noscroll_view_widget, info, metrics
[docs] def timeScrolling(self, only_horizontal=False, print_to_stdout=True): """ Scroll the alignment and print how long it takes to the terminal. :param only_horizontal: If True, the alignment will only be scrolled horizontally. If False, the alignment will be scrolled both vertically and horizontally. :type only_horizontal: bool :param print_to_stdout: Whether to print a message when scrolling is starting and ending. :type print_to_stdout: bool :return: The time (in seconds) to scroll the alignment. :rtype: float """ app = QtWidgets.QApplication.instance() if print_to_stdout: print("Timing scrolling...") start_time = time.perf_counter() if not only_horizontal: vscroll = self.view.verticalScrollBar() last_value = None while vscroll.value() != last_value: last_value = vscroll.value() self._processEventsMultiple(app) vscroll.triggerAction(vscroll.SliderSingleStepAdd) self._processEventsMultiple(app) self.view.update() self._processEventsMultiple(app) while vscroll.value() > 0: vscroll.triggerAction(vscroll.SliderSingleStepSub) self.view.update() self._processEventsMultiple(app) hscroll = self.view.horizontalScrollBar() last_value = None while hscroll.value() != last_value: last_value = hscroll.value() hscroll.triggerAction(hscroll.SliderSingleStepAdd) self.view.update() self._processEventsMultiple(app) while hscroll.value() > 0: hscroll.triggerAction(hscroll.SliderSingleStepSub) self.view.update() self._processEventsMultiple(app) end_time = time.perf_counter() scroll_time = end_time - start_time if print_to_stdout: print("scroll time =", scroll_time) return scroll_time
[docs] def timeScrollingByPage(self): """ Scroll the alignment and print how long it takes to the terminal. Scrolling is only done downwards and rightwards so that no cached data is used. """ app = QtWidgets.QApplication.instance() print("Timing scrolling...") start_time = time.perf_counter() vscroll = self.view.verticalScrollBar() vmax = vscroll.maximum() while vscroll.value() < vmax: vscroll.triggerAction(vscroll.SliderPageStepAdd) self.view.update() self._processEventsMultiple(app) hscroll = self.view.horizontalScrollBar() hmax = hscroll.maximum() while hscroll.value() < hmax: hscroll.triggerAction(hscroll.SliderPageStepAdd) self.view.update() self._processEventsMultiple(app) end_time = time.perf_counter() print("scroll time =", end_time - start_time)
def _processEventsMultiple(self, app): """ Call processEvents multiple times in case any pending timer slots trigger additional timers. :param app: The Qt application :type app: QtWidgets.QApplication """ for _ in range(25): app.processEvents()
[docs] def profileScrolling(self): """ Scroll the alignment while recording profiling data. Returns immediately. """ QtCore.QTimer.singleShot(0, self.profileScrollingBlocking)
[docs] def profileScrollingBlocking(self, only_horizontal=False, print_to_stdout=True, filename=None): """ Scroll the alignment while recording profiling data. Returns when scrolling is complete. :param only_horizontal: If True, the alignment will only be scrolled horizontally. If False, the alignment will be scrolled both vertically and horizontally. :type only_horizontal: bool :param print_to_stdout: Whether to print the filename, the measured time, and a message when scrolling is starting and ending. :type print_to_stdout: bool :param filename: The file to save profiling data to. If not given, a filename will be generated using the current date and time. :type filename: str or None :return: The time (in seconds) to scroll the alignment. :rtype: float """ if filename is None: filename = time.strftime("profile%Y%m%d-%H%M%S.prof") if print_to_stdout: print("writing profiling to", os.path.join(os.getcwd(), filename)) scroll_time = profiling.profile_call(self.timeScrolling, only_horizontal=only_horizontal, print_to_stdout=print_to_stdout, profile_filename=filename) if print_to_stdout: print("finished profiling") return scroll_time
[docs] def profileScrollingByPage(self): """ Scroll the alignment while recording profiling data. Scrolling is only done downwards and rightwards so that no cached data is used. """ QtCore.QTimer.singleShot(0, self._profileScrollingByPage)
def _profileScrollingByPage(self): filename = time.strftime("profile_by_page%Y%m%d-%H%M%S.prof") print("writing profiling to", os.path.join(os.getcwd(), filename)) profile.runctx("self.timeScrollingByPage()", globals(), locals(), filename) print("finished profiling")
[docs] @QtCore.pyqtSlot() def onWorkspaceColorsChanged(self): options = self.options_model if (options.colors_enabled and isinstance(options.seq_color_scheme, color.WorkspaceScheme)): self.pullWorkspaceColors()
[docs] def pullWorkspaceColors(self): """ Update the current color scheme with colors from the workspace. This does not modify the options model. """ color_map = self._structure_model.getWorkspaceColors() self._table_model.updateResidueColors(color_map)
[docs] def applyColorsToWorkspace(self, all_atoms=False): """ Update the colors in the workspace with colors from the MSV. :param all_atoms: Whether to color all atoms or just carbons :type all_atoms: bool """ color_map = self._table_model.getResidueColors() self._structure_model.setWorkspaceColors(color_map, all_atoms=all_atoms)
[docs] def loadPdbs(self, pdb_paths, index=None): """ Loads a pdb file into the widget. This will load the PDB into the workspace for all widgets. (See comments on MSV-1326.) If an index is specified, sequences will be loaded at that index. Index cannot be specified for Workspace tab. :type pdb_path: str :param pdb_path: Path to the pdb file :param index: Index to insert the sequences at. Cannot be specified for Workspace widget. :type index: int or None :raise: ValueError if index is specified for Workspace widget. """ if self.isWorkspace(): if index is not None: raise ValueError("Cannot load PDBs at specific index for " "Workspace widget.") seqs = self._protectedImport(self._importFilesWorkspace, pdb_paths) if seqs: self.setSelectedSequences(seqs) else: seqs = self._protectedImport(self._structure_model.importFiles, pdb_paths) if seqs: self.importSeqs(seqs, index=index) return seqs
def _importFilesWorkspace(self, file_paths): all_seqs = [] for path in file_paths: seqs = self._structure_model.importStructuresIntoWorkspace(path) all_seqs.extend(seqs) return all_seqs def _protectedImport(self, func, *args, **kwargs): """ Call the specified function with the given arguments to import a structure file. If structure.py raises an exception (indicating that the file has an error), display the error to the user and return None. Otherwise, return the sequences returned by the function. :param func: The import function to call :type func: Callable :return: A list of imported sequences or None :rtype: list(schrodinger.protein.sequence.Sequence) or None """ with dialogs.wait_dialog("Importing sequences, please wait...", parent=self): try: return func(*args, **kwargs) except Exception as exc: self.warning(title="Exception", text=str(exc)) return None
[docs] def importIncluded(self, replace_selection=True): """ Import all sequences that are included in the Maestro workspace. """ all_seqs = self._structure_model.getIncludedEntries() seqs = self._filterChains(all_seqs) if seqs: self.importSeqs(seqs, replace_selection=replace_selection) self._reportAlreadyLinked(all_seqs, seqs)
[docs] def importSelected(self): """ Import all sequences that are selected in the Maestro project table. """ all_seqs = self._structure_model.getSelectedEntries() seqs = self._filterChains(all_seqs) if seqs: self.importSeqs(seqs) self._reportAlreadyLinked(all_seqs, seqs)
def _filterChains(self, seqs): """ Return a sequence list that omits any sequences in the provided list that are already represented in the alignment. :param seqs: An iterable of sequences. :type seqs: iterable :return: A list of sequences. :rtype: list """ already_incl = { (seq.entry_id, seq.chain) for seq in self.getAlignment() } return [ seq for seq in seqs if (seq.entry_id, seq.chain) not in already_incl ] def _reportAlreadyLinked(self, all_seqs, imported_seqs): """ If some sequences were not imported, tell the user. """ num_all = len(all_seqs) num_skipped = num_all - len(imported_seqs) if num_skipped == 0: return if num_skipped == num_all: if num_all == 1: # Replace "1 of 1 chain" with "The chain" text = "The" else: # Replace "2 of 2 chains" with "All 2 chains" text = f"All {num_all}" else: text = f"{num_skipped} of {num_all}" p = inflect.engine() text += p.inflect( " requested plural_noun('chain', {1}) " "num({0}, False)plural_verb('was') skipped, as plural_noun('it') " "plural_verb('was') already linked to ".format( num_skipped, num_all)) text += 'a ' if num_skipped == 1 else '' text += p.inflect(f"plural('sequence', {num_skipped}) in this tab") self.info(text)
[docs] def getReferenceSeq(self): """ Helper function that returns the reference sequence. :return: reference sequence of the underlying alignment :rtype: `Sequence` """ return self.getAlignment().getReferenceSeq()
[docs] def isWorkspace(self): """ :return: Whether this widget contains an alignment that is always kept in sync with the Maestro workspace. :rtype: bool """ return self.model.is_workspace
[docs] def setLightMode(self, enable): """ Enable or disable lightmode on the widget's views and model """ self._light_mode = enable self._table_model.setLightMode(enable) self.view.setLightMode(enable) self.aln_metrics_view.setLightMode(enable) self.aln_info_view.setLightMode(enable)
[docs] def setEditMode(self, enable): """ Enable or disable edit mode. :param enable: Whether to enable edit mode. :type enable: bool """ self.view.setEditMode(enable)
[docs] def insertGapsToLeftOfSelection(self): self.view.insertGapsToLeftOfSelection()
[docs] def deleteSelectedGaps(self): self.view.deleteSelectedGaps()
[docs] def copySelectedSeqs(self): """ Returns a dictionary of new copies of each selected sequence mapped to the sequence it was copied from. :return: Dictionary of new sequence copies mapped to their source sequence :rtype: dict """ aln = self.getAlignment() sel_seqs = aln.getSelectedSequences() return {copy.deepcopy(seq): seq for seq in sel_seqs}
[docs] @QtCore.pyqtSlot() def duplicateAtBottom(self): """ Duplicate selected sequences and add at bottom """ seqs_map = self.copySelectedSeqs() aln = self.getAlignment() if not seqs_map: return plural_sequences = inflect.engine().no("Sequence", len(seqs_map)) desc = f"Duplicate {plural_sequences} at Bottom" with command.compress_command(self.undo_stack, desc): aln.duplicateSeqs(seqs_map)
[docs] @QtCore.pyqtSlot() def duplicateAtTop(self): """ Duplicate selected sequences and add at top """ seqs_map = self.copySelectedSeqs() aln = self.getAlignment() if not seqs_map: return plural_sequences = inflect.engine().no("Sequence", len(seqs_map)) desc = f"Duplicate {plural_sequences} at Top" with command.compress_command(self.undo_stack, desc): aln.duplicateSeqs(seqs_map, 1)
[docs] @QtCore.pyqtSlot() def duplicateAsRef(self): """ Duplicate selected sequence and make reference seq """ seqs_map = self.copySelectedSeqs() new_seqs = list(seqs_map.keys()) if len(new_seqs) != 1: return if self._checkCanSetReferenceSeq(): new_seq = new_seqs[0] aln = self.getAlignment() desc = f"Duplicate {new_seq.fullname} as Reference Sequence" with command.compress_command(self.undo_stack, desc): aln.clearAnchors() aln.duplicateSeqs(seqs_map, replace_selection=True) self.setSelectedSeqAsReference()
[docs] @QtCore.pyqtSlot() def duplicateInPlace(self): """ Duplicate selected sequences and add in place """ seqs_map = self.copySelectedSeqs() if not seqs_map: return aln = self.getAlignment() desc = "Duplicate in Place" with command.compress_command(self.undo_stack, desc): aln.duplicateSeqsHighlightColorMap(seqs_map) for new_seq, source_seq in seqs_map.items(): aln.duplicateSeqs({new_seq: source_seq}, aln.index(source_seq) + 1)
[docs] def insertResidues(self): self.view.insertResidues()
[docs] @prompt_if_seqs_hidden def deleteGapOnlyColumns(self): """ Delete all columns in the alignment comprised of only gaps """ aln = self.getAlignment() aln.minimizeAlignment()
[docs] def copySelection(self): self.view.copySelection()
[docs] def deleteSelection(self): self.view.deleteSelection()
[docs] def changeResidues(self): self.view.changeResidues()
[docs] def replaceSelection(self): self.view.replaceSelection()
[docs] def replaceSelectionWithGaps(self): self.view.replaceSelectionWithGaps()
@QtCore.pyqtSlot() def _createAlnSet(self): """ Create a new alignment set and add all selected sequences to it. The user will be prompted for the name of the new set. """ set_name = self._promptForAlnSetName( previous_set=self._previous_set_name) if set_name is None: return aln = self.getAlignment() seqs = aln.seq_selection_model.getSelection() aln.addSeqsToAlnSet(seqs, set_name) self._previous_set_name = set_name @QtCore.pyqtSlot(str) def _addToAlnSet(self, set_name): """ Add all selected sequences to the specified alignment set. :param set_name: The name of the alignment set to add to :type set_name: str """ aln = self.getAlignment() seqs = aln.seq_selection_model.getSelection() aln.addSeqsToAlnSet(seqs, set_name) @QtCore.pyqtSlot() def _removeSeqsFromAlnSet(self): """ Remove all selected sequences from their alignment sets. """ aln = self.getAlignment() seqs = aln.seq_selection_model.getSelection() aln.removeSeqsFromAlnSet(seqs) @QtCore.pyqtSlot(str) def _selectAlnSet(self, set_name): """ Select all sequences in the named alignment set. """ self._selectAlnSetImplementation(set_name, True) @QtCore.pyqtSlot(str) def _deselectAlnSet(self, set_name): """ Deselect all sequences in the named alignment set. """ self._selectAlnSetImplementation(set_name, False) def _selectAlnSetImplementation(self, set_name, select): """ Select-only or deselect the sequence in the given set. :param set_name: name of the sequence set. :type set_name: str :param select: Whether to select only or deselect. :type select: bool """ aln = self.getAlignment() aln_set = aln.getAlnSet(set_name) if select: aln.seq_selection_model.clearSelection() aln.seq_selection_model.setSelectionState(aln_set, select) @QtCore.pyqtSlot(str) def _dissolveAlnSet(self, set_name): """ Dissolve the named alignment set. """ aln = self.getAlignment() aln_set = aln.getAlnSet(set_name) aln.removeSeqsFromAlnSet(aln_set) @QtCore.pyqtSlot() def _gatherAlnSets(self): self.getAlignment().gatherAlnSets() @QtCore.pyqtSlot(str) def _renameAlnSet(self, set_name): """ Rename the named alignment set. The user will be prompted for the new name. """ new_name = self._promptForAlnSetName(current_name=set_name) if new_name is None: return aln = self.getAlignment() aln.renameAlnSet(set_name, new_name) def _promptForAlnSetName(self, current_name=None, previous_set=None): """ Prompt the user for an alignment set name. :param current_name: Current name for a rename action :type current_name: str or NoneType :return: Name if it's non-empty and unique, otherwise None :rtype: str or NoneType """ if current_name is None: title = "Alignment set" label = "Enter set name" name = 'Set 1' if previous_set is not None: name = _getNextSetName(previous_set) # Check if the set with suggested name already exists. existing_sets = self.getAlignment().alnSetNames() while name in existing_sets: name = _getNextSetName(name) kwargs = {'text': name} else: title = "Rename alignment set" label = "Enter new set name" kwargs = {'text': current_name} set_name, ok_clicked = QtWidgets.QInputDialog.getText( self, title, label, **kwargs) aln = self.getAlignment() if not ok_clicked: return elif not set_name.strip(): self.error("No name given.") return elif set_name in aln.alnSetNames(): self.error(f'Set "{set_name}" already exists.') return return set_name
[docs] @QtCore.pyqtSlot() def hideSelectedSeqs(self): self.getAlignment().hideSelectedSeqs()
[docs] def showWorkspaceSequences(self): aln = self.getAlignment() Incl = viewconstants.Inclusion vis_states = (Incl.PartiallyVisible, Incl.FullyVisible) ws_seqs = [seq for seq in aln if seq.visibility in vis_states] if ws_seqs: with command.compress_command(self.undo_stack, "Show Workspace Sequences Only"): aln.showSeqs(ws_seqs, hide_others=True)
[docs] def showAllSeqs(self): self.getAlignment().showAllSeqs() self.model.options.seq_filter_enabled = False
[docs] @QtCore.pyqtSlot() def enableFindSequence(self): """ Enable sequence filtering on the model """ self.model.options.seq_filter_enabled = True
@QtCore.pyqtSlot() def _clearSeqFilter(self): self.model.options.seq_filter = ""
[docs]class ProteinAlignmentMsvWidget(AbstractMsvWidget): """ :ivar startBlastTaskRequested: Signal emitted to request running a blast task. The widget doesn't start the task itself to avoid problems if the widget is deleted while the task is running. :ivar startPredictorBlastTaskRequested: Signal emitted to request running a blast task for predictors. :ivar proteinStructAlignResultsReady: Signal emitted when protein structure alignment results are ready. Emitted with a list of `align.AbstractStructureAligner.Result` namedtuples. """ alignmentFinished = QtCore.pyqtSignal() startBlastTaskRequested = QtCore.pyqtSignal(blast.BlastTask) startPredictorBlastTaskRequested = QtCore.pyqtSignal(blast.BlastTask) proteinStructAlignResultsReady = QtCore.pyqtSignal(list)
[docs] def initSetUp(self): super().initSetUp() self._descriptors_cleared_timer = QtCore.QTimer() self._descriptors_cleared_timer.setSingleShot(True) self._descriptors_cleared_timer.timeout.connect( self._showDescriptorsClearedWarning) self._pfam_task_queue = None self._kinase_task_queue = None self._kinase_cons_task_queue = None self._align_method_map = { viewconstants.SeqAlnMode.Multiple: self.multipleAlignment, viewconstants.SeqAlnMode.PairwiseSS: self.pairwiseAlignmentSecondaryStructure, viewconstants.SeqAlnMode.Pairwise: self.invokePairwiseAlignment, viewconstants.SeqAlnMode.Structure: self.alignBySuperposition, viewconstants.SeqAlnMode.Residue: self.alignByResidueNumbers, viewconstants.SeqAlnMode.Profile: self.invokeProfileAlignment, viewconstants.StructAlnMode.Superimpose: self.superimposeStructures, viewconstants.StructAlnMode.BindingSite: self.runBindingSiteAlignment, viewconstants.StructAlnMode.Structure: self.runStructureAlignment, }
[docs] def initSetOptions(self): super().initSetOptions() self.blast_search_dialog = self._initBlastSearchDialog()
[docs] def defineMappings(self): mappings = super().defineMappings() M = self.model_class mappings.extend([ (self.blast_search_dialog, M.blast_task.input), (self.blast_search_dialog.settings_dlg.ui.local_only_cb, M.options.blast_local_only), ]) # yapf: disable return mappings
[docs] def getSignalsAndSlots(self, model): ss = super().getSignalsAndSlots(model) hm_input = model.homology_modeling_input hm_settings = hm_input.settings ss.extend([ (hm_input.pick_chimeraChanged, self._onHMPickChimeraChanged), (hm_settings.ligand_dlg_model.pick_ligandChanged, self._onHMPickLigandChanged), (hm_settings.pick_proximityChanged, self._onHMPickProximityChanged), (model.options.binding_site_distanceChanged, self._onBindingSiteDistanceChanged), (model.options.seq_filter_enabledChanged, self._onSeqFilterEnabledChanged), (model.options.seq_filterChanged, self._onSeqFilterChanged), (model.options.pick_modeChanged, self._onPickModeChanged), (model.options.kinase_features_enabledChanged, self._generateKinaseFeatures), (model.options.sequence_annotations.mutated, self._generateAnnotations), (model.aln_signals.descriptorsCleared, self._descriptors_cleared_timer.start), ]) # yapf: disable return ss
@QtCore.pyqtSlot() def _showDescriptorsClearedWarning(self): self.warning(title="Descriptors Invalidated", text=_DESCRIPTORS_CLEARED_WARNING_MSG) @QtCore.pyqtSlot(object) def _onHMPickLigandChanged(self, pick): pick_mode = PickMode.HMBindingSite if pick else None self.model.options.pick_mode = pick_mode @QtCore.pyqtSlot(object) def _onHMPickChimeraChanged(self, pick): pick_mode = PickMode.HMChimera if pick else None self.model.options.pick_mode = pick_mode @QtCore.pyqtSlot(object) def _onHMPickProximityChanged(self, pick): pick_mode = PickMode.HMProximity if pick else None self.model.options.pick_mode = pick_mode @QtCore.pyqtSlot() def _onBindingSiteDistanceChanged(self): """ Update the annotation binding site distance """ distance = self.model.options.binding_site_distance.value for seq in self.model.aln: seq.annotations.setLigandDistance(distance) @QtCore.pyqtSlot(object) @QtCore.pyqtSlot(bool) def _onSeqFilterEnabledChanged(self, enabled): self.model.split_aln.setSeqFilterEnabled(enabled) @QtCore.pyqtSlot(object) @QtCore.pyqtSlot(str) def _onSeqFilterChanged(self, filter): self.model.split_aln.setSeqFilterQuery(filter) @QtCore.pyqtSlot(object) def _onPickModeChanged(self, pick_mode): do_chimera = pick_mode is PickMode.HMChimera do_ligand = pick_mode is PickMode.HMBindingSite do_proximity = pick_mode is PickMode.HMProximity hm_input = self.model.homology_modeling_input hm_settings = hm_input.settings hm_input.pick_chimera = do_chimera hm_settings.ligand_dlg_model.pick_ligand = do_ligand hm_settings.pick_proximity = do_proximity def _getNewAlignment(self): """ Return a ProteinAlignment with a reference to the widget's undo stack set on it. :return: `schrodinger.application.msv.alignment.Alignment` """ aln = gui_alignment.GuiProteinAlignment() return aln def _getAlignmentView(self, undo_stack): """ Return an alignment view appropriate for the widget. """ return view.ProteinAlignmentView(undo_stack, self) def _getNoScrollAlignmentView(self): """ Return an alignment view without scroll bars, that is not shown """ return view.NoScrollAlignmentView(None) def _getAlignmentInfoView(self): """ Return an alignment view for the alignment info columns on the right side """ return view.AlignmentInfoView(self.view, self) def _getAlignmentMetricsView(self): """ Return an alignment view for the "frozen" metrics columns on the right side """ return view.AlignmentMetricsView(self.view, self) def _getAlignmentViewModel(self): """ Return an alignment viewmodel appropriate for the widget. """ return viewmodel.ViewModel(self) @QtCore.pyqtSlot(set, set) def _generateAnnotations(self, new_annos, old_annos): """ Start calculation for newly-enabled sequence annotations. Note:: This is for any annotations that require a long calculation. :param new_annos: Annotations that are currently enabled :type new_annos: set :param old_annos: Annotations that were already enabled :type new_annos: set """ added = new_annos - old_annos if added: if SEQ_ANNO_TYPES.kinase_conservation in added: self._generateKinaseConservation() def _generateKinaseConservation(self): """ Calculate kinase conservation if it's enabled in the options model """ if SEQ_ANNO_TYPES.kinase_conservation not in self.model.options.sequence_annotations: return num_tasks = 0 for seq in self.getShownSplitSequences(): # TODO MSV-3404 handle invalidating kinase features on edit if (not seq.hasStructure() or seq.is_kinase_cons_annotated or (seq.is_kinase_annotated and not seq.isKinaseChain())): continue if self._kinase_cons_task_queue is None: self._kinase_cons_task_queue = MsvTaskQueue( description="Computing kinase conservation...", max_running_tasks=1) self._kinase_cons_task_queue.queueDone.connect( self._onKinaseConsTaskQueueDone) for lig_asl in seq.annotations.ligand_asls: task = kinase.KinaseFeatureAndConservationTask() task.input.seq = seq task.input.ligand_asl = lig_asl self._kinase_cons_task_queue.addTask(task) num_tasks += 1 if num_tasks: if self._kinase_cons_task_queue.status != task.RUNNING: self._kinase_cons_task_queue.start() self.taskStarted.emit(self._kinase_cons_task_queue) def _onKinaseConsTaskQueueDone(self): for task in self._kinase_cons_task_queue.getTasks(): seq = task.input.seq # TODO MSV-3697 report errors if task.status is task.DONE: if not seq.is_kinase_annotated: seq.setKinaseFeatures(task.getKinaseFeatures()) if seq.isKinaseChain(): lig_asl = task.input.ligand_asl results = task.parseConservationOutput() seq.setKinaseConservation(results, lig_asl) task._tempdir.cleanup() self._kinase_cons_task_queue = None def _generateKinaseFeatures(self): """ Calculate kinase features if it's enabled in the options model """ if not self.model.options.kinase_features_enabled: return num_tasks = 0 for seq in self.getShownSplitSequences(): # TODO MSV-3404 handle invalidating kinase features on edit if seq.is_kinase_annotated: continue if self._kinase_task_queue is None: self._kinase_task_queue = MsvTaskQueue( description="Computing kinase features...", max_running_tasks=1) self._kinase_task_queue.queueDone.connect( self._onKinaseTaskQueueDone) task = kinase.KinaseFeatureTask() task.input.seq = seq self._kinase_task_queue.addTask(task) num_tasks += 1 if num_tasks: if self._kinase_task_queue.status != task.RUNNING: self._kinase_task_queue.start() self.taskStarted.emit(self._kinase_task_queue) @QtCore.pyqtSlot() def _onKinaseTaskQueueDone(self): for task in self._kinase_task_queue.getTasks(): # TODO MSV-3697 report errors if task.status is task.DONE: features = task.getKinaseFeatures() task.input.seq.setKinaseFeatures(features) task._tempdir.cleanup() self._kinase_task_queue = None
[docs] def onQuickAlignRequested(self): """ Align based on model settings """ settings = self.model.options.align_settings align_mode = settings.getAlignMode() self.callAlignMethod(align_mode)
[docs] def callAlignMethod(self, align_mode: Union[viewconstants.StructAlnMode, viewconstants.StructAlnMode]): """ Call appropriate Align method based the alignment mode. """ align_method = self._align_method_map[align_mode] success = align_method() if success: self.alignmentFinished.emit()
[docs] def invokeProfileAlignment(self): aln = self.getAlignment() if aln.getAnchoredResidues(): title = "Anchors Found" text = ("Profile alignments are performed on the entire sequence. " "All anchors will be removed. Continue with the alignment?") if not self.question(title=title, text=text): return if aln.res_selection_model.hasSelection(): title = "Residues Selected" text = ("Profile alignments are performed on the entire sequence. " "Residue selection will be cleared. Continue with the " "alignment?") if not self.question(title=title, text=text): return return self.runProfileAlignment()
[docs] @validate_align.align_command(undo_desc="Profile Alignment", can_align_sets=True) def runProfileAlignment(self): full_aln = self.getAlignment() full_aln.gatherAlnSets() aln_sets, loose_seqs = validate_align.get_aln_sets_and_seqs_to_align( full_aln) aln_mode = validate_align.get_aln_set_align_mode(full_aln, aln_sets=aln_sets, loose_seqs=loose_seqs) if aln_mode is not viewconstants.SeqAlnMode.Profile: raise align.CantAlignException( "Can only run profile-profile alignment with 2 alignment sets " "or 1 alignment set and loose sequences") assert len(aln_sets) > 0 first_set = aln_sets[0] assert full_aln.getReferenceSeq() in first_set seq_idxs1 = [full_aln.index(seq) for seq in first_set] if len(aln_sets) == 2: assert not loose_seqs seq_idxs2 = [full_aln.index(seq) for seq in aln_sets[1]] self._profileAlignment(full_aln, seq_idxs1, seq_idxs2) else: assert len(aln_sets) == 1 prev_seq_index = None for seq in loose_seqs: seq_index = full_aln.index(seq) # Virtually add previous loose seq to existing profile if prev_seq_index is not None: seq_idxs1.append(prev_seq_index) # Align single loose seq to existing profile self._profileAlignment(full_aln, seq_idxs1, [seq_index]) prev_seq_index = seq_index
def _profileAlignment(self, full_aln, indices1, indices2): """ :param full_aln: The full alignment to align :param indices1: Sequence indices to use as the first alignment. Must start at 0 and be adjacent. :param indices2: Sequence indices to use as the second alignment. Must start immediately after indices1 and be adjacent. """ prev_idx1 = None indices1 = sorted(indices1) if indices1[0] != 0: raise ValueError("First profile must start with the first sequence") for idx in indices1: if prev_idx1 is not None and idx - prev_idx1 != 1: raise ValueError("Profile indices must be adjacent") prev_idx1 = idx indices2 = sorted(indices2) if indices2[0] != prev_idx1 + 1: raise ValueError("Aln1 and Aln2 profile indices must be adjacent") prev_idx2 = None for idx in indices2: if prev_idx2 is not None and idx - prev_idx2 != 1: raise ValueError("Profile indices must be adjacent") prev_idx2 = idx AlnClass = type(full_aln) seqs1 = [] seqs2 = [] missing_indices = [] for idx, seq in enumerate(full_aln): if idx in indices1: seqs1.append(copy.deepcopy(seq)) elif idx in indices2: seqs2.append(copy.deepcopy(seq)) else: missing_indices.append(idx) if missing_indices: msg = "Missing indices must start immediately after indices2" assert missing_indices[0] == prev_idx2 + 1, msg assert len(seqs1) + len(seqs2) + len(missing_indices) == len(full_aln) aln1 = AlnClass(seqs1) aln2 = AlnClass(seqs2) job = muscle.MuscleJob(aln1, aln2) new_gap_indices = self._runMultipleAlignmentJob(job) if new_gap_indices is not None: full_aln.removeAllGaps() if missing_indices: new_gap_indices.extend([] for _ in missing_indices) full_aln.addGapsByIndices(new_gap_indices)
[docs] @validate_align.align_command(undo_desc="Multiple Alignment", split_by_anchors=True, can_align_sets=True, superimpose_param=gui_models. AlignSettingsModel.multiple.superimpose_after) def multipleAlignment(self, *, _start=None, _end=None): """ Run multiple sequence alignment. `_start` and `_end` are private implementation params and not part of the public API (will be ignored if passed) """ self._callAlignFunc(start=_start, end=_end, aln_func=self._multipleAlignment) align_settings = self.model.options.align_settings.multiple if align_settings.find_globally_conserved: self.model.options.color_by_aln = viewconstants.ColorByAln.Matching self.generatePfam()
def _multipleAlignment(self, aln): align_settings = self.model.options.align_settings.multiple algo = align_settings.aln_algorithm if algo is viewconstants.MultAlnAlgorithm.Clustal: JobClass = clustal.ClustalJob else: JobClass = muscle.MuscleJob job = JobClass(aln) gapopen = align_settings.gap_open_penalty gapext = align_settings.gap_extend_penalty job.setGapPenalties(gapopen, gapext) new_gap_indices = self._runMultipleAlignmentJob(job) if new_gap_indices is not None: aln.removeAllGaps() aln.addGapsByIndices(new_gap_indices) def _runMultipleAlignmentJob(self, job): # Open a progress bar using number of output lines from clustal as a # measure of progress. # Estimate the total number of lines using the number of possible # pairwise alignments plus some. total = job.maximum_progress * 1.25 # Add extra to account for post- # processing time to add gaps with dialogs.aln_progress_dialog(job, text='Aligning...', total=total, parent=self): new_aln = job.run() if not new_aln: return new_gaps = new_aln.getGaps() new_gap_indices = [ [g.idx_in_seq for g in seq_gaps] for seq_gaps in new_gaps ] return new_gap_indices
[docs] def optimizeAlignment(self, target_seq, template_seq, ligand_asl=None, residue_asl=None, radius=None): if not template_seq.hasStructure(): self.warning( "Optimize alignment requires template sequence to have structure." ) return task = optimize_alignment.OptimizeAlignmentTask() task.input.target_seq = target_seq task.input.template_seq = template_seq task.input.ligand_asl = ligand_asl task.input.residue_asl = residue_asl if radius is not None: task.input.radius = radius task.statusChanged.connect(self._onOptimizeAlignmentTaskStatusChanged) self.taskStarted.emit(task) task.start()
@QtCore.pyqtSlot() def _onOptimizeAlignmentTaskStatusChanged(self): task = self.sender() if task.status is task.FAILED: self.warning("Optimize alignment failed.") elif task.status is task.DONE: aln = self.getAlignment() gaps = [] for seq in aln: if seq == task.input.target_seq: gap_seq = task.output.target_seq elif seq == task.input.template_seq: gap_seq = task.output.template_seq else: gap_seq = seq gaps.append([g.idx_in_seq for g in gap_seq.getGaps()]) with command.compress_command(self.undo_stack, "Optimize alignment"): aln.removeAllGaps() aln.addGapsByIndices(gaps) def _callAlignFunc(self, aln_func, start=None, end=None): """ Helper method to align a subalignment :param aln_func: Align method :type aln_func: callable :param start: Start index of the subalignment :type start: int or None :param end: Start index of the subalignment (exclusive) :type end: int or None """ full_aln = self.getAlignment() if (start is None) != (end is None): msg = "Start and end must be both None or both int" raise align.CantAlignException(msg) if start is not None and end is not None: full_aln.padAlignment() aln = full_aln.getSubalignment(start, end) orig_gap_indices = [ [g.idx_in_seq for g in seq.getGaps()] for seq in aln ] aln_func(aln) new_gap_indices = [ [g.idx_in_seq for g in seq.getGaps()] for seq in aln ] if new_gap_indices != orig_gap_indices: gaps_to_remove = [] for seq, gap_indices in zip(full_aln, orig_gap_indices): gaps_to_remove.extend( seq[g_idx + start] for g_idx in gap_indices) gap_idxs_to_add = [[idx + start for idx in s_idxs] for s_idxs in new_gap_indices] with full_aln.suspendAnchors(): full_aln.removeElements(gaps_to_remove) # TODO MSV-1771 new gaps should be selected full_aln.addGapsByIndices(gap_idxs_to_add) full_aln.removeTerminalGaps() else: aln_func(full_aln)
[docs] def updateWorkspaceSelection(self): """ Apply residue selection from this widget to the Maestro workspace. This will remove residue selection for any workspace entries without a linked sequence and will update residue selection for any entries awaiting delayed sync. (Residue selection is not automatically synchronized for newly included entries until selection is changed in either the workspace or the MSV. This method will force that synchronization.) """ aln = self.getAlignment() self._structure_model.delayedSyncFromMsvToWorkspace(aln)
[docs] def invokePairwiseAlignment(self): settings = self.model.options.align_settings.pairwise if settings.lock_gaps: success = self.mergedPairwiseAlignment() else: success = self.pairwiseAlignment() return success
[docs] @validate_align.align_command(undo_desc="Pairwise Sequence Alignment", pairwise=True, superimpose_param=gui_models. AlignSettingsModel.pairwise.superimpose_after) def pairwiseAlignment(self, *, _start=None, _end=None): """ Run pairwise sequence alignment using modified Needleman-Wunsch algorithm. If residues are anchored, pairwise alignment with locked reference gaps will be performed instead. If no sequences are selected, the first non-reference sequence will be aligned to the reference sequence. If one or more sequences are selected, each selected non-reference sequence will be aligned to the reference sequence. `_start` and `_end` are private implementation params and not part of the public API (will be ignored if passed) """ merge = False if len(self.getAlignment().getAnchoredResidues()): text = "These sequences contain anchored residues. The alignment " \ "will be run with all Reference gaps locked.\nContinue anyway?" proceed = self.question(text, title="Anchored Residues Found", save_response_key="pairwise_anchors") if proceed: merge = True else: return aln_func = partial(self._pairwiseAlignment, merge=merge) self._callAlignFunc(start=_start, end=_end, aln_func=aln_func)
def _pairwiseAlignment(self, aln, merge=False): align_settings = self.model.options.align_settings.pairwise if isinstance(aln.getReferenceSeq(), sequence.NucleicAcidSequence): # The substitution matrixes are protein-specific sub_matrix = None else: sub_matrix = getattr(constants, align_settings.sub_matrix, None) if align_settings.prevent_ss_gaps: ALIGNER_CLASS = align.SchrodingerPairwiseAligner else: ALIGNER_CLASS = align.BiopythonPairwiseAligner aligner = ALIGNER_CLASS( gap_open_penalty=align_settings.gap_open_penalty, gap_extend_penalty=align_settings.gap_extend_penalty, sub_matrix=sub_matrix, preserve_reference_gaps=merge, ss_constraints=align_settings.prevent_ss_gaps, penalize_end_gaps=align_settings.penalize_end_gaps) if not merge: ref_gaps = aln.getReferenceSeq().getGaps() aln.removeElements(ref_gaps) # Get seqs_to_align using full_aln because aln may not have selection full_aln = self.getAlignment() ref_seq = full_aln.getReferenceSeq() seq_idxs_to_align = (full_aln.index(seq) for seq in full_aln.getSelectedSequences() if seq != ref_seq) seqs_to_align = [aln[s_idx] for s_idx in seq_idxs_to_align] if (align_settings.set_constraints and full_aln.pairwise_constraints.hasConstraints()): # `aln` may not have pairwise constraints so we need to convert # `full_aln` pairwise constraints to use residue objects from `aln` ref_res_map = { res.rescode: res for res in aln.getReferenceSeq() if res.is_res } other_res_map = {res.rescode: res for res in aln[1] if res.is_res} constraints = [] for ref_res, other_res in full_aln.pairwise_constraints.getPairs(): new_ref = ref_res_map.get(ref_res.rescode) new_other = other_res_map.get(other_res.rescode) if new_ref is not None and new_other is not None: constraints.append((new_ref, new_other)) else: constraints = None aligner.run(aln, seqs_to_align, constraints=constraints)
[docs] @validate_align.align_command( undo_desc="Pairwise Sequence Alignment with Locked Reference Gaps", pairwise=True) def mergedPairwiseAlignment(self, *, _start=None, _end=None): """ Runs a pairwise alignment while preserving reference gaps. `_start` and `_end` are private implementation params and not part of the public API (will be ignored if passed) """ aln_func = partial(self._pairwiseAlignment, merge=True) self._callAlignFunc(start=_start, end=_end, aln_func=aln_func)
[docs] @qt_utils.wait_cursor @validate_align.align_command( undo_desc= "Pairwise Sequence Alignment with Secondary Structure Prediction", pairwise=True) def pairwiseAlignmentSecondaryStructure(self): settings = self.model.options.align_settings.pairwise_ss protein_family = "GPCR" if settings.use_gpcr_aln else None aligner = align.PrimeSTAAligner(protein_family=protein_family) aln = self.getAlignment() if settings.set_constraints: constraints = aln.pairwise_constraints.getPairs() else: constraints = None aligner.run(aln, constraints=constraints)
[docs] @validate_align.align_command(undo_desc="Superposition Alignment", structure=True) def alignBySuperposition(self): """ Align sequences by structure superposition. The pairwise residue C-alpha distances are converted to a scoring matrix where a short distance is a high score and vice versa. """ align_settings = self.model.options.align_settings.pairwise aligner = align.SuperpositionAligner( gap_open_penalty=align_settings.gap_open_penalty, gap_extend_penalty=align_settings.gap_extend_penalty) aln = self.getAlignment() aligner.run(aln, seqs_to_align=aln[1:]) aln.minimizeAlignment()
[docs] @validate_align.align_command( undo_desc="Align by Residue Number", split_by_anchors=True, superimpose_param=gui_models.AlignSettingsModel.residue_number. superimpose_after) def alignByResidueNumbers(self, *, _start=None, _end=None): """ Align by residue numbers. `_start` and `_end` are private implementation params and not part of the public API (will be ignored if passed) """ self._callAlignFunc(start=_start, end=_end, aln_func=self._alignByResidueNumbers)
def _alignByResidueNumbers(self, aln): aligner = align.RescodeAligner() aligner.run(aln) aln.minimizeAlignment() aln.padAlignment()
[docs] def superimposeStructures(self, show_panel=True): """ Superimposes the workspace structures according to their sequence alignment in the MSV. :param show_panel: Whether to show the superimpose ASL panel :type show_panel: bool """ try: selected_seqs = self._getSeqsForStructureAlignment( require_ref=False) except align.CantAlignException as exc: self.warning(title="Cannot Superimpose Structures", text=str(exc)) return if selected_seqs is None: return # Split entries if needed selected_seqs = self._splitAllChains(selected_seqs, prompt=True) if selected_seqs is None: return if show_panel: maestro.command("showpanel superimpose") align_settings = self.model.options.align_settings.superimpose selected_only = align_settings.align_sel_res_only aln = self.getAlignment() entry_residue_map = validate_align.get_residue_map_to_superimpose( aln, selected_seqs, selected_only=selected_only) if self.model.is_workspace: orig_seqs = list(aln) self._structure_model.superimposeByAlignment(entry_residue_map) if self.model.is_workspace: # Superimposing requires excluding entries that aren't selected and # reincluding them will move their linked sequences to the end. So # we manually reorder to keep the original sequence order. new_index_map = {seq: idx for idx, seq in enumerate(aln)} reorder_indexes = [new_index_map[seq] for seq in orig_seqs] if reorder_indexes != list(range(len(aln))): aln.reorderSequences(reorder_indexes) return True
[docs] def runStructureAlignment(self): """ Runs protein structure alignment. """ aln = self.getAlignment() ref_seq = aln.getReferenceSeq() settings = copy.deepcopy( self.model.options.align_settings.protein_structure) Represents = viewconstants.StructAlnSequenceRepresents Transform = viewconstants.StructAlnTransform # Expand selection to entire entries if necessary if (self.model.split_chain_view and settings.seq_represents is Represents.EntireEntry): sel_model = aln.seq_selection_model eids = {seq.entry_id for seq in sel_model.getSelection()} eids.discard(None) entry_seqs = (seq for seq in aln if seq.entry_id in eids) sel_model.setSelectionState(entry_seqs, True) try: selected_seqs = self._getSeqsForStructureAlignment() except align.CantAlignException as exc: self.warning(title="Cannot Align Structures", text=str(exc)) return if selected_seqs is None: return if (settings.seq_represents is Represents.SingleChain and settings.align_transforms is Transform.Existing): eids = [int(seq.entry_id) for seq in selected_seqs] eid_counts = Counter(eids) ref_eid_count = eid_counts.pop(int(ref_seq.entry_id)) any_nonref_eids = bool(eid_counts) if ref_eid_count > 1 and not any_nonref_eids: dlg = dialogs.StructAlignExistingEntryMessageBox(parent=self) response = dlg.exec() if response: settings.align_transforms = Transform.Individual else: return elif ref_eid_count > 1 and any_nonref_eids and settings.align_seqs: dlg = dialogs.StructAlignMultipleRefChainMessageBox(parent=self) response = dlg.exec() if response: settings.align_seqs = False else: return should_split = (settings.seq_represents is Represents.SingleChain and settings.align_transforms is Transform.Individual) if should_split: selected_seqs = self._splitAllChains(selected_seqs, prompt=False) selected_seqs.remove(ref_seq) return self._runStructureAlignment(settings, selected_seqs)
@validate_align.align_command(undo_desc="Structure Alignment", structure=True, split_res_blocks=False) def _runStructureAlignment(self, settings, selected_seqs): aln = self.getAlignment() ref_seq = aln.getReferenceSeq() keywords = dict() if settings.force: # Based on mmshare/apps/ska/scripts/structalign_utility.py keywords['RECKLESS'] = 'yes' Represents = viewconstants.StructAlnSequenceRepresents if settings.seq_represents is Represents.SingleChain: if settings.map_seqs: results = [] for each_map in settings.chain_name: ref_chain = each_map.reference seq_chain = each_map.seq_chain[1] seq_entry_id = each_map.seq_chain[2] ref_asl = f"entry.id {ref_seq.entry_id} and chain.name {ref_chain}" other_asl = f"entry.id {seq_entry_id} and chain.name {seq_chain}" aligner = align.CustomASLStructureAligner( keywords=keywords, ref_asl=ref_asl, other_asl=other_asl) aligner.run(aln, seqs_to_align=selected_seqs) _result = aligner.getResultSeqs() if _result: results.append(_result[0]) self._showAlignmentResultDialog(results) return aligner = align.StructureAligner(keywords=keywords) else: eids = {seq.entry_id for seq in selected_seqs} eids.add(ref_seq.entry_id) eids.discard(None) if len(eids) == 1: raise align.CantAlignException( "Cannot run Protein Structure Alignment on entire entries " "with only 1 entry selected") RefASLMode = viewconstants.StructAlnRefASLMode other_asl = None # Get ASL for reference if settings.ref_asl_mode is RefASLMode.All: ref_asl = settings.ref_asl elif settings.ref_asl_mode is RefASLMode.Selected: ref_asl, other_asl = self._createSelectedResidueASLs( aln, settings.other_define_asl) elif settings.ref_asl_mode is RefASLMode.ASL: # TODO MSV-2790 May need to expand ASL to residues ref_asl = settings.ref_asl else: assert False # Get ASL for non-reference if settings.other_define_asl: # TODO MSV-2790 May need to expand ASL to residues other_asl = settings.other_asl elif other_asl is None: other_asl = ref_asl aligner = align.CustomASLStructureAligner(keywords=keywords, ref_asl=ref_asl, other_asl=other_asl) # Check that ASLs are valid result = aligner.evaluateASLs(aln, seqs_to_align=selected_seqs) if not result.ref_ok: self.error(f"No reference atoms matched the ASL {ref_asl}") return if not result.other_ok: self.error( f"No non-reference atoms matched the ASL {other_asl}") return if result.other_ok and result.other_skips: msg = ("The following non-reference sequence(s) have no atoms " "matching the ASL and will not have their structures " "aligned: ") msg += ", ".join(seq.name for seq in result.other_skips) self.warning(msg) aligner.run(aln, seqs_to_align=selected_seqs) if settings.align_seqs: self.alignBySuperposition() results = aligner.getResultSeqs() self._showAlignmentResultDialog(results) def _showAlignmentResultDialog(self, results): if not results: self.warning("No Protein Structure Alignments were produced.") return self.proteinStructAlignResultsReady.emit(results) dlg = dialogs.StructAlignResultDialog(parent=self) dlg.setResults(results) dlg.run(modal=True)
[docs] @validate_align.align_command(undo_desc="Binding Site Alignment", structure=True, pairwise=True, split_res_blocks=False) def runBindingSiteAlignment(self): """ Runs binding site alignment """ selected_seqs = self._getSeqsForBindingSiteAlignment() if selected_seqs is None: return self.model.options.sequence_annotations.add( SEQ_ANNO_TYPES.binding_sites) aln = self.getAlignment() ref_seq = aln.getReferenceSeq() settings = self.model.options.align_settings.binding_site selected_res = [] if settings.align_sel_res_only: all_selected_res = aln.res_selection_model.getSelection() selected_res = [ res for res in ref_seq.residues() if res in all_selected_res ] task = self._getBindingSiteAlignTask(seqs_to_align=selected_seqs, settings=settings, res_to_align=selected_res) def _onBindingSiteAlignmentTaskDone(): align_results = task.output.sts for seq, st in zip(selected_seqs, align_results): seq.setStructure(st) if settings.align_seqs: aln.res_selection_model.clearSelection() self.alignBySuperposition() dlg = dialogs.BindingSiteAlignResultDialog(parent=self) if task.input.align_sel_res_only: definition = "Selected reference residues" else: definition = f"Within {task.input.binding_site_cutoff}Å of ligand" dlg.setResults(results=task.output.rmsd_rows, definition=definition, ref_name=ref_seq.name) dlg.run(blocking=True, modal=True) if dlg.shouldAddRMSD(): nonref_seqs = selected_seqs[1:] missing_titles = self._addBindingSiteRMSD( nonref_seqs, task.output.rmsd_rows) if missing_titles: self.warning( "Could not find Binding Site Alignment RMSD for " + ", ".join(missing_titles)) task.taskDone.connect(_onBindingSiteAlignmentTaskDone) task.taskFailed.connect(lambda: self.error( title="No Alignment", text="There was a problem with the binding site alignment. " "Please check the job log.")) self.taskStarted.emit(task) task.start()
def _addBindingSiteRMSD(self, seqs, rmsd_rows): """ Set binding site RMSD as a structure property. :param seqs: Non-reference structured sequences that were aligned with binding site alignment :type seqs: iterable(schrodinger.protein.ProteinSequence) :param rmsd_rows: RMSD output from the binding site alignment task :type rmsd_rows: list[namedtuple] :return: Structure titles that did not have a corresponding RMSD value :rtype: list[str] """ rmsds_by_title = {row.other_name: row.rmsd for row in rmsd_rows} missing_titles = [] for seq in seqs: st = seq.getStructure() rmsd = rmsds_by_title.get(st.title) if rmsd is None: missing_titles.append(st.title) else: st.property[PROPNAME_BINDINGSITE_RMSD] = float(rmsd) seq.setStructure(st) return missing_titles def _getBindingSiteAlignTask(self, seqs_to_align, settings, res_to_align=None): """ Creates the Binding Site Alignment task and sets the structures of aligned sequences. :param seqs_to_align: Sequences to be aligned :type seqs_to_align: iterable(schrodinger.protein.ProteinSequence) :param settings: Binding site alignment settings :type settings: gui_models.BindingSiteAlignSettingsModel :param res_to_align: Residues to be aligned if align selected residues only. Each residues corresponds to a residue in the ref sequence :type res_to_align: list(protein.residue.Residue) """ task = binding_site_align.BindingSiteAlignmentTask() task.input.sts = [seq.getStructure() for seq in seqs_to_align] task.input.binding_site_cutoff = settings.binding_site_cutoff.value task.input.mapping_dist = settings.mapping_dist.value task.input.previously_aligned = settings.previously_aligned task.input.align_sel_res_only = settings.align_sel_res_only task.input.res_list = res_to_align return task def _createSelectedResidueASLs(self, aln, other_define_asl=False): """ Create ASLs based on the selected residues in the sequences linked to the reference entry and the sequences linked to the non-reference entries. If the non-reference sequences have selected residues, a separate ASL will be constructed for those structures. Otherwise, other_asl will be the same as ref_asl. :param aln: The alignment :type aln: gui_alignment.GuiProteinAlignment :param other_define_asl: Whether a separate ASL is defined for the non-reference sequences. If True, other_asl will be None. :return: An ASL for the reference entry and an ASL for the non-reference entries or None if other_define_asl is True :rtype: tuple(str, str or NoneType) """ selected_res = aln.res_selection_model.getSelection() if not selected_res: raise align.CantAlignException( "No residues are selected, can't align based on selected " "residues.") residues_by_entry = defaultdict(list) for res in selected_res: entry_id = res.sequence.entry_id if entry_id is None: continue residues_by_entry[entry_id].append(res) ref_seq = aln.getReferenceSeq() ref_residues = residues_by_entry.pop(ref_seq.entry_id, None) if ref_residues is None: raise align.CantAlignException( "No reference residues are selected, can't align based on " "selected residues.") ref_asl = self._structure_model.generateResidueASL(ref_residues) if other_define_asl: other_asl = None # will get replaced outside elif not residues_by_entry: # No non-reference residues selected; use same ASL other_asl = ref_asl else: other_asl = self._structure_model.generateEntryResidueASL( residues_by_entry) return ref_asl, other_asl def _getSeqsForStructureAlignment(self, require_ref=True): """ Gather sequences for structure alignment. The reference sequence must have a structure All of the selected sequences must have structures :param require_ref: Whether the reference sequence must have a structure :return: Sequences to align or `None` if the sequences are inappropriate :rtype: list(sequence.ProteinSequence) or NoneType :raises align.CantAlignException: If the selected sequences aren't appropriate for aligning by structure """ aln = self.getAlignment() can_aln, msg = validate_align.alignment_precheck(aln) if not can_aln: raise align.CantAlignException(msg) ref_seq = self.getReferenceSeq() # Ensure that the reference sequence has a structure. if require_ref and not ref_seq.hasStructure(): msg = "The reference sequence must have a structure." raise align.CantAlignException(msg) sel_seqs = aln.seq_selection_model.getSelection() num_structureless = 0 selected_structured = [] unselected_structured = [] for seq in aln.getShownSeqs(): if seq is ref_seq: continue if seq.hasStructure(): if (self.model.options.align_settings.align_only_selected_seqs and seq not in sel_seqs): unselected_structured.append(seq) else: selected_structured.append(seq) elif seq in sel_seqs: num_structureless += 1 if not selected_structured: if not unselected_structured: msg = "At least two sequences with structures must be present." raise align.CantAlignException(msg) elif num_structureless: msg = "At least two sequences with structures must be selected." raise align.CantAlignException(msg) if num_structureless: msg = (f"{num_structureless} of the selected sequences do not have " "structures. Align the structured sequences only?") if not self.question(msg): return None seqs_to_align = None if selected_structured: seqs_to_align = selected_structured elif unselected_structured: # When the selection is empty, align all seqs with structures seqs_to_align = unselected_structured if ref_seq.hasStructure(): seqs_to_align.insert(0, ref_seq) return seqs_to_align def _getSeqsForBindingSiteAlignment(self): """ Gather sequences to align by binding site. The selected sequences must all have a structure and a ligand. :return: Sequences to align :rtype: iterable(sequence.ProteinSequence) :raises align.CantAlignException: If the selected sequences do not all have a structure and ligand """ selected_seqs = self._getSeqsForStructureAlignment() if selected_seqs is None: return None settings = self.model.options.align_settings.binding_site for seq in selected_seqs: if len(seq.annotations.ligands) == 0: msg = f'{seq.name} does not have an associated ligand. ' \ 'Binding sites cannot be aligned.' raise align.CantAlignException(msg) elif len(seq.annotations.ligands ) > 1 and not settings.align_sel_res_only: msg = f'{seq.name} has more than one ligand. ' \ 'Binding site cannot be detected.' raise align.CantAlignException(msg) return selected_seqs def _splitAllChains(self, sequences, prompt=True): """ Split chains into separate entries if needed. The reference sequence will not be split. :param sequences: :type sequences: iterable(sequence.ProteinSequence) :param prompt: Whether to ask for permission to split chains :type prompt: bool :return: Sequences to align or `None` if split chains is not allowed. :rtype: set(sequence.ProteinSequence) or None """ # In order to have a valid query, the selected sequences must come from # different entries - we can work around by splitting chains into # separate entries seqs_by_entry = defaultdict(list) for seq in sequences: seqs_by_entry[seq.entry_id].append(seq) selected_seqs = set() eids_to_split = [] for eid, seqs in seqs_by_entry.items(): if len(seqs) == 1: selected_seqs.add(seqs[0]) else: eids_to_split.append(eid) if eids_to_split and prompt: if not af1.question( "Chains must be in separate entries. " "Split chains and proceed with the alignment?", parent=self, title="Attempting to Align Multimer Chains"): return None ref_seq = self.getReferenceSeq() if ref_seq.hasStructure(): # ref seq is allowed to be structureless for superimposeStructures selected_seqs.add(ref_seq) if eids_to_split: to_move = [] eid_chain_map = self._makeEidChainMap() for entry_id in eids_to_split: # Only keep chains that were selected for alignment keep_chains = {seq.chain for seq in seqs_by_entry[entry_id]} if entry_id == ref_seq.entry_id: keep_chains.discard(ref_seq.chain) new_seqs = self._disassociateChains(entry_id, eid_chain_map, keep_chains=keep_chains) selected_seqs.update(new_seqs) to_move.extend(new_seqs) # Move new seqs to the top self._moveSequences(to_move, viewconstants.Direction.Top) self.setSelectedSequences(selected_seqs) return selected_seqs def _disassociateChains(self, entry_id, eid_chain_map, keep_chains): """ Disassociates chains for an entry while preserving gaps and structureless residues. :param entry_id: The entry ID to split :type entry_id: int :param eid_chain_map: A mapping between entry ID, chain ID, and sequence :type eid_chain_map: dict(int, dict(str, sequence.Sequence)) :return: Disassociated sequences, now with unique entry ids :rtype: list(sequence.Sequence) """ # ask the structuremodel to disassociate the structure in maestro new_seqs = self._structure_model.disassociateChains( entry_id, is_workspace=self.model.is_workspace, keep_chains=keep_chains) new_seqs = [seq for seq in new_seqs if seq.chain in keep_chains] if not self.isWorkspace(): seqs_to_add = [ seq for seq in new_seqs if seq.chain in eid_chain_map[entry_id] ] self.getAlignment().addSeqs(seqs_to_add) new_eids = {seq.entry_id for seq in new_seqs} update_kwargs = dict(entry_id=entry_id, eid_chain_map=eid_chain_map, new_eids=new_eids) self._updateDisassociatedSeqs(**update_kwargs) return new_seqs def _makeEidChainMap(self): """ Create a mapping between entry ID, chain ID, and sequence for the alignment :return: A mapping between entry ID, chain ID, and sequence :rtype: dict(int, dict(str, sequence.Sequence)) """ eid_chain_map = defaultdict(dict) for seq in self.getAlignment(): eid_chain_map[seq.entry_id][seq.chain] = seq return dict(eid_chain_map) def _updateDisassociatedSeqs(self, entry_id, eid_chain_map, new_eids): """ Update sequences that were just split from an entry to match their previous state. Gaps and structureless residues are copied. If needed, the reference sequence is updated. The original sequences are moved to the bottom. :param entry_id: The entry ID that was split :type entry_id: int :param eid_chain_map: A mapping between entry ID, chain ID, and original sequence :type eid_chain_map: dict(int, dict(str, sequence.Sequence)) :param new_eids: Entry IDs of the new structures corresponding to the chains of the split entry :type new_eids: iterable(int) """ aln = self.getAlignment() for seq in aln: if (seq.entry_id not in new_eids or seq.chain not in eid_chain_map[entry_id]): continue old_seq = eid_chain_map[entry_id][seq.chain] # Copy gaps and structureless res from old to new for idx, res in enumerate(old_seq): if not res.hasStructure(): aln.addElements(seq, idx, [copy.deepcopy(res)]) def _exportSelectedSeqs(self): """ Open Sequence Export dialog with 'Selected Sequences' option selected. """ dlg_model = self.seq_export_dialog.model dlg_model.which_sequences = dlg_model.Sequences.SELECTED self.exportSequences()
[docs] def exportSequences(self): """ Exports sequences in FASTA format. """ dlg = self.seq_export_dialog proceed = dlg.exec() if not proceed: return model = dlg.model aln = self.model.split_aln if not aln: msg = "Alignment is currently empty." self.warning(title="No Alignment", text=msg) return # If 'Selected Sequences' is chosen, only export sequences if there # is a selected residue in that sequence if model.which_sequences == model.Sequences.SELECTED: selected_res = aln.seq_selection_model.getSelection() if not selected_res: self.warning( title="No selected sequences", text="Please select some sequences to be exported.") return # If 'Selected Alignment' is chosen, only export if there is a # selected residue in that sequence if model.which_residues == model.Residues.SELECTED: selected_res = aln.res_selection_model.getSelectionIndices() if not selected_res: self.warning(title="No selected residues", text="Please select some residues to be exported.") return if model.which_sequences == model.Sequences.SELECTED: seq_idx = aln.seq_selection_model.getSelectionIndices() if not any(idx[0] in seq_idx for idx in selected_res): self.warning( title="No selected residues in selected sequences", text="Please select some residues in a selected " "sequence to be exported.") return file_name = dlg.selectedFile() self._exportSequences(model, file_name)
def _exportSequences(self, model, file_path): """ Export sequences to file. Depending on the GUI option selected, each file will have sequence of each chain or sequence of each protein. :param model: the model for sequence export :type model: gui_models.ExportSequenceModel :param file_path: Sequence file path :type file_path: str """ aln = self.model.split_aln subset_idxs = None # Only output selected sequences if model.which_sequences == model.Sequences.SELECTED: subset_idxs = aln.seq_selection_model.getSelectionIndices() elif model.which_sequences == model.Sequences.DISPLAYED: subset_idxs = [ i for i, shown in enumerate(aln.getSeqShownStates()) if shown ] new_aln = copy.deepcopy(aln) # Only output selected residues if model.which_residues == model.Residues.SELECTED: all_res = {res for seq in new_aln for res in seq} sel_res = new_aln.res_selection_model.getSelection() with new_aln.modifyingStructure(): new_aln.replaceResiduesWithGaps(all_res - sel_res) if not model.preserve_indices: # minimize the sequence but leave a gap column between each # block new_aln = new_aln.getAlignmentMinimizedWithSpaces() if subset_idxs is not None: subset_idxs.sort() # create a new alignment with just the sequences which were # previously selected seqs = [new_aln[idx] for idx in subset_idxs] new_aln = aln.__class__(seqs) reference_seq = new_aln.getReferenceSeq( ) if model.include_similarity else None writers = { model.Format.ALN: seqio.ClustalAlignmentWriter, model.Format.CSV: seqio.CSVAlignmentWriter, model.Format.SEQD: seqio.SeqDAlignmentWriter, } AW = writers.get(model.format, seqio.FastaAlignmentWriter) names_and_chains = [(seq.name, seq.chain) for seq in new_aln] needs_unique = len(set(names_and_chains)) != len(names_and_chains) if not model.create_multiple_files: AW.write(new_aln, file_path, use_unique_names=needs_unique, export_annotations=model.include_ss_anno, sim_ref_seq=reference_seq, maxl=MAX_FASTA_LINE_LENGTH, export_descriptors=model.export_descriptors) else: self._exportToMultipleFiles(model=model, aln=new_aln, writer=AW, file_path=file_path, use_unique_names=needs_unique, sim_ref_seq=reference_seq) def _exportToMultipleFiles(self, model, aln, writer, file_path, use_unique_names, sim_ref_seq): """ Export sequences to multiple files. Sequence of each chain or sequence of each protein will be exported to a file. :param model: the model for sequence export :type model: gui_models.ExportSequenceModel :param aln: Alignment of selected sequences/residues to export :type aln: gui_alignment.GuiProteinAlignment :param writer: Sequence Alignment writer :type writer: seqio.FastaAlignmentWriter or seqio.ClustalAlignmentWriter :param file_path: Sequence file path :type file_path: str :param use_unique_names: If True, write unique name for each sequence. :type use_unique_names: bool :param sim_ref_seq: Reference sequence to calculate similarities for the sequences to be exported. If None, similarity will not be exported. :type sim_ref_seq: `sequence.Sequence` or None """ seqs_to_export = aln if model.split_file_by == model.SplitFileBy.Structure: # Sequences for each protein as {(sequence name, # entry id) : [sequences]} protein_seq_dict = defaultdict(list) for seq in aln: protein_seq_dict[(seq.name, seq.entry_id)].append(seq) seqs_to_export = protein_seq_dict.values() for seq in seqs_to_export: if model.split_file_by == model.SplitFileBy.Structure: file_name = f"{seq[0].name}" aln_to_export = aln.__class__(seq) else: file_name = f"{seq.name}_{seq.chain}" aln_to_export = aln.__class__([seq]) file_name = slugify(file_name) seq_file_path = file_path.replace(dialogs.PLACEHOLDER_FILE_NAME, file_name) writer.write(aln_to_export, seq_file_path, use_unique_names=use_unique_names, export_annotations=model.include_ss_anno, sim_ref_seq=sim_ref_seq, maxl=MAX_FASTA_LINE_LENGTH)
[docs] def openBlastSearchDialog(self): """ Opens Blast Search dialog. """ query = self.getReferenceSeq() if query is None: self.warning( title="Set BLAST Query Sequence", text="There are no sequences available for a BLAST search.") return self.model.blast_task.input.settings.location = blast.LOCAL self.model.blast_task.input.query_sequence = query self.blast_search_dialog.run(modal=True, blocking=True)
def _runComputeSequenceDescriptorsDialog(self): """ Runs the ComputeSequenceDescriptorsDialog and runs any required tasks to generate the sequence and structure descriptors requested from the dialog. """ dlg = dialogs.ComputeSequenceDescriptorsDialog(parent=self) ok_pressed = dlg.exec() if not ok_pressed: return if dlg.model.selected_seqs_only: seqs = self.getSelectedSequences() else: seqs = self.model.aln[:] if not seqs: return # Lambda slots with references to self cause problems with garbage # collection. To avoid this, we replace self with a weakref. self = weakref.proxy(self) @qt_utils.wait_cursor def run_descriptor_task(desc_task, seqs): desc_task.input.seqs = seqs desc_task.specifyTaskDir(desc_task.TEMP_TASKDIR) desc_task.start() desc_task.statusChanged.connect( lambda: self._onDescriptorTaskStatusChanged(desc_task)) self.taskStarted.emit(desc_task) # Generate sequence descriptors. requested_descriptors = [ desc.property_name for desc in dlg.model.selected_descriptors if desc.property_source is desc.Sequence ] if requested_descriptors: seq_desc_task = descriptors.SequenceDescriptorsTask() seq_desc_task.input.seqs = seqs seq_desc_task.input.requested_descriptors = requested_descriptors seq_desc_task.statusChanged.connect( lambda: self._onDescriptorTaskStatusChanged(seq_desc_task)) seq_desc_task.start() self.taskStarted.emit(seq_desc_task) # Generate structure descriptors if any structure descriptors were # requested from the dialog. if any(desc.property_source is desc.Structure for desc in dlg.model.selected_descriptors): sts_desc_task = descriptors.StructureDescriptorsTask() seqs = [seq for seq in seqs if seq.hasStructure()] if seqs: run_descriptor_task(sts_desc_task, seqs) def _onDescriptorTaskStatusChanged(self, task): if task.status != task.DONE: return output = task.output for seq in self.model.aln: if seq in output.descriptors: descriptors = output.descriptors[seq] seq.updateDescriptors(descriptors, task.PROP_SOURCE) self._onSeqPropsUpdated(self.model.options.sequence_properties) msg = """<html><p>New sequence descriptors have been calculated.</p> <p>To display the values, choose <i>Show properties...</i><br> from the View Options pane (+button).</p></html>""" self.info(msg) def _initBlastSearchDialog(self): dialog = dialogs.BlastSearchDialog(self) # Lambda slots with references to self cause problems with garbage # collection. To avoid this, we replace self with a weakref. self = weakref.proxy(self) dialog.blastSearchRequested.connect( lambda: self.startBlastTaskRequested.emit(self.model.blast_task)) return dialog def _getFinishedBlastTask(self): """ Get the most recently finished blast task """ task = self.model.blast_task if task.status is task.DONE: return task return None
[docs] def hasBlastResults(self, for_ref_seq=False): """ Return whether the blast task is done. :param for_ref_seq: Whether to require that the results are for the current reference seq """ ref_seq = self.getReferenceSeq() if for_ref_seq and ref_seq is None: return blast_task = self._getFinishedBlastTask() done = blast_task is not None if not for_ref_seq: return done return (done and blast_task.input.query_sequence == ref_seq)
[docs] def downloadBlastPDBs(self, pdb_ids): remote_ok = not self.model.options.blast_local_only fetch_ids = seqio.process_fetch_ids(pdb_ids, dialog_parent=self) _, pdb_result = dialogs.download_seq_pdb(fetch_ids, parent=self, pdb_remote_ok=remote_ok) pdb_paths = pdb_result.paths error_ids = pdb_result.error_ids seqs = self.loadPdbs(pdb_paths) if seqs and self.model.blast_task.input.settings.align_after_download: seq_to_align = seqs[0] aln = self.getAlignment() ref_seq = aln.getReferenceSeq() align_settings = self.model.options.align_settings og_align_only = align_settings.align_only_selected_seqs align_settings.align_only_selected_seqs = True with aln.seq_selection_model.suspendSelection(), \ aln.res_selection_model.suspendSelection(): try: aln.seq_selection_model.setSelectionState( {ref_seq, seq_to_align}, True) self.pairwiseAlignment() finally: align_settings.align_only_selected_seqs = og_align_only if error_ids: if remote_ok: self.warning(title="Error Retrieving Structures", text="Could not retrieve the following ID(s) from " f"the remote server: {', '.join(error_ids)}") else: dialogs.LocalBlastPDBNoResultsMessageBox( self, ", ".join(error_ids), count=len(error_ids)).exec()
# TODO MSV-2041: move this to alignment.py def _selectPattern(self, pattern): """ Search for pattern in alignment and select matching residues :param pattern: PROSITE pattern (see `protein.sequence.find_generalized_pattern` for documentation). :type pattern: str :returns: 2-tuple of success and error message :rtype: tuple(bool, str) """ aln = self.getAlignment() if len(aln) == 0: return (False, "No sequences found to search.") matching_residues = aln.findPattern(pattern) if len(matching_residues) == 0: return (False, "There were no matches for the specified pattern.") # TODO MSV-1508 this should be invalidated whenever selection changes self._current_pattern_idx = None with command.compress_command(self.undo_stack, "Select Pattern-Matching Residues"): aln.res_selection_model.clearSelection() aln.res_selection_model.setSelectionState(matching_residues, True) return (True, "")
[docs] def movePattern(self, forward=True): """ Scroll view to next or previous pattern instance. :param forward: whether to move pattern view forward :type forward: bool """ aln = self.getAlignment() selected_residue_set = aln.res_selection_model.getSelection() if not selected_residue_set: return def residue_sort(res): return (aln.index(res.sequence), res.resnum, res.inscode) selected_residues = sorted( (res for res in selected_residue_set if res.is_res), key=residue_sort) current_index = self._current_pattern_idx if forward: new_index = current_index + 1 if current_index is not None else 0 else: new_index = current_index - 1 if current_index is not None else -1 n_indexes = len(selected_residues) # Modulo to allow wraparound new_index = new_index % n_indexes next_res = selected_residues[new_index] res_model_index = self.view.model().getIndexForRes(next_res) # Scroll view to current index self.view.scrollTo(res_model_index, QtWidgets.QAbstractItemView.PositionAtCenter) # Update index self._current_pattern_idx = new_index
[docs]def check_if_can_retry_blast(dialog_parent, task, blast_local_only): """ For a failed BLAST task, show the appropriate dialog. - If the task was run remote, show "Remote BLAST failed" error - If tasks can only run locally, show "Local BLAST failed error" - If the task was run locally, ask whether to try Remote BLAST :return: Whether the task should be retried remote :rtype: bool """ settings = task.input.settings error_msg = str(task.failure_info.exception) retry = False if settings.location == blast.REMOTE: # Already tried remote - show error dialog text = f"Searching the remote BLAST server failed:\n\n{error_msg}" dialog_parent.warning( title=f"Remote BLAST Search Failed - {task.getQueryName()}", text=text) elif blast_local_only: # Local failed but remote is not allowed - error out dialogs.LocalBlastNoResultsMessageBox(dialog_parent).exec() elif isinstance(task.failure_info.exception, blast.NoBlastHitsError): # Local had no hits - prompt for remote (unless do not ask again) retry = dialogs.RemoteBlastMessageBox(dialog_parent).exec() else: # Local failed - prompt for remote retry retry = dialog_parent.question( "Searching the local BLAST server failed:\n\n%s\n\n" "The search will continue on the remote server." % error_msg, title=f"Local BLAST Search Failed - {task.getQueryName()}") return retry
[docs]def get_svg_renderer(widget): """ Get a renderer that can paint a widget by means of svg onto a paint devices. This effectively vectorizes the image, allowing for very high resolutions to be used when drawing on the paint device. :param widget: a widget that can be converted into an svg. This needs a render method :type widget: QtWidgets.QWidget :return: the svg renderer :rtype: QtSvg.QSvgRenderer """ gen = QtSvg.QSvgGenerator() gen.setSize(widget.size()) gen.setViewBox(QtCore.QRect(0, 0, widget.width(), widget.height())) buffer = QtCore.QBuffer() buffer.open(QtCore.QIODevice.ReadWrite) gen.setOutputDevice(buffer) painter = QtGui.QPainter() painter.begin(gen) widget.render(painter) painter.end() buffer.seek(0) svg = buffer.readAll() renderer = QtSvg.QSvgRenderer(svg) return renderer
[docs]def save_pdf(renderer, file_name): """ Draw an image from the renderer to the given file name as a pdf :param renderer: the renderer that will draw the image :type renderer: QtSvg.QSvgRenderer :param file_name: the name of the file to write to :type file_name: str """ printer = QtPrintSupport.QPrinter(QtPrintSupport.QPrinter.HighResolution) printer.setOutputFormat(QtPrintSupport.QPrinter.PdfFormat) printer.setOutputFileName(file_name) printer.setFontEmbeddingEnabled(True) printer.setFullPage(False) printer.setPageSize(QtGui.QPageSize(renderer.defaultSize())) printer.setPageMargins(0, 0, 0, 0, QtPrintSupport.QPrinter.DevicePixel) painter = QtGui.QPainter(printer) renderer.render(painter) painter.end()
[docs]def save_png(renderer, file_name, dpi): """ Draw an image from the renderer to the given file name as a png :param renderer: the renderer that will draw the image :type renderer: QtSvg.QSvgRenderer :param file_name: the name of the file to write to :type file_name: str :param dpi: how many dpi to export to PNG :type dpi: int """ screen = QtWidgets.QApplication.instance().primaryScreen() scale = dpi / screen.logicalDotsPerInch() w = int(renderer.defaultSize().width() * scale) h = int(renderer.defaultSize().height() * scale) im = QtGui.QImage(w, h, QtGui.QImage.Format_ARGB32) painter = QtGui.QPainter(im) renderer.render(painter) painter.end() im.save(file_name, format="PNG", quality=100)