Source code for schrodinger.application.msv.gui.homology_modeling.hm_models

"""
`parameters` models for homology modeling
"""
import collections
import copy
import itertools
import logging
import os
from typing import List

import inflect

import schrodinger
import schrodinger.utils.log as log
from schrodinger import structure
from schrodinger.application.msv.gui import picking
from schrodinger.application.msv.gui.homology_modeling import constants
from schrodinger.application.msv.gui.homology_modeling.constants import \
    HeteromultimerMode
from schrodinger.application.msv.gui.homology_modeling.constants import Mode
from schrodinger.application.msv.gui.homology_modeling.constants import HomologyStatus
from schrodinger.application.msv.gui.picking import PickMode
from schrodinger.application.prime import wrapper as prime_wrapper
from schrodinger.infra import jobhub
from schrodinger.job import jobcontrol
from schrodinger.models import adapters
from schrodinger.models import json
from schrodinger.models import parameters
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.annotation import make_ligand_name_atom
from schrodinger.Qt import QtCore
from schrodinger.structutils import analyze
from schrodinger.structutils.build import delete_zobs
from schrodinger.tasks import jobtasks
from schrodinger.tasks import queue
from schrodinger.tasks import tasks
from schrodinger.utils import fileutils
from schrodinger.utils import mmutil

logger = log.get_output_logger("msv2_homology_panel")
if schrodinger.in_dev_env():
    logger.setLevel(logging.DEBUG)

maestro = schrodinger.get_maestro()

HOMOLOGY_INP_EXT = ".inp"
HOMOLOGY_OUT_SUFFIX = "-out.mae"
HOMOLOGY_PROP = "s_psp_Homology_Source"
VIEWNAME = "MSV2_homology_modeling"
ENTRY_TITLE_KEY = 'HOMOLOGY_ENTRY_TITLE'

PROTEIN_CHAIN_ASL = 'protein AND chain "{}"'
NON_PROTEIN_ASL = 'fillmol (NOT protein AND within 5.0 ({protein}))'


[docs]def get_seq_display_name(seq): """ Formats sequence name and chain (if set) for display """ name_parts = [seq.name] chain = seq.chain.strip() if chain: name_parts.append(chain) return "_".join(name_parts)
def _get_file_base(input_filename): """ The backend uses the extensionless basename of the first input file to name other files """ return fileutils.splitext(os.path.basename(input_filename))[0]
[docs]class InputConfigAdapter(adapters.ParamToInputConfig): STRING_BOOL_KWDS = [ 'build_deletions', 'build_insertions', 'build_transitions', 'keep_rotamers', 'minimize', 'side_opt', 'template_residue_numbers' ] @classmethod def _paramToDict(cls, param): param_dict = super()._paramToDict(param) # Change enum to string and rename keyword method = param_dict.pop('prime_method') knowledge_based = 'true' if method is constants.PrimeMethod.KNOWLEDGE else 'false' param_dict['knowledge_based'] = knowledge_based # Change bool to string for key in cls.STRING_BOOL_KWDS: bool_val = param_dict[key] assert isinstance(bool_val, bool) param_dict[key] = 'true' if bool_val else 'false' return param_dict
[docs]class EmptyPrimeConfig(prime_wrapper.PrimeConfig): """ Remove the spec from PrimeConfig. The spec contains keys that can only be passed once in the input file, but `TEMPLATE_NAME` must be present in the input file for each template structure. Therefore, we create a new EmptyPrimeConfig object for each template and append them to the same input file. """ specs = []
[docs] def validateValues(self, *args, **kwargs): """ No-op to prevent traceback due to empty spec """
[docs]class Ligand(parameters.CompoundParam): """ :param name: Unique ligand name including chain and residue number :param pdbres: Ligand residue name :param chain: Ligand chain name :param resnum: Ligand residue number :param molnum: Ligand molecule number :param asl: ASL representing the ligand :param structure_name: Name of the structure (excluding chain) :param cofactor: Whether the het is not a ligand as defined by the ligand preferences """ name: str = None pdbres: str = None chain: str = None resnum: int = None molnum: int = None asl: str = None structure_name: str = None cofactor: bool = False
[docs]class LigandMolecule(parameters.CompoundParam): """ Ligand for display (combines ligands with the same molnum) """ name: str = None structure_name: str = None source_ligands: list @property def cofactor(self): return any(lig.cofactor for lig in self.source_ligands)
[docs]class LigandConstraint(parameters.CompoundParam): res: residue.Residue = None ligname: str = None
[docs]class TargetTemplatePair(parameters.CompoundParam): target_seq: sequence.ProteinSequence = None template_seq: sequence.ProteinSequence = None identity: float ligands: List[Ligand]
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._prime_template = None self._has_waters = False self.target_seqChanged.connect(self.updateIdentity) self.template_seqChanged.connect(self._onTemplateSeqChanged) self._onTemplateSeqChanged() # changed not emitted if set on init
@property def is_valid(self): """ Whether both the target and template are defined """ return (self.target_seq is not None and self.template_seq is not None and self.template_seq.hasStructure()) @property def has_waters(self): """ Whether the template structure has waters :rtype: bool """ return self._has_waters @property def alignment_quality(self): if self.identity is None: return None identity_percent = self.identity * 100 if identity_percent < constants.WEAK_CUTOFF: quality = constants.AlignmentQuality.WEAK elif identity_percent < constants.LOW_CUTOFF: quality = constants.AlignmentQuality.LOW else: quality = constants.AlignmentQuality.ACCEPTABLE return quality
[docs] def updateIdentity(self): if self.is_valid: identity = self.target_seq.getIdentity(self.template_seq, consider_gaps=False) else: identity = None self.identity = identity
[docs] def getPrimeTemplateName(self, suffix=None): """ Get the name that will be used to identify this prime template. :param suffix: Optional suffix for alignment name (needed if more than one target uses the same template) :type suffix: int or NoneType """ template = self._prime_template name_parts = [template.name, template.chain] if suffix is not None and suffix > 0: name_parts.append(str(suffix)) return "_".join(name_parts)
def _getPrimeAlignment(self, suffix=None): target_seq = self.target_seq template = self._prime_template target_str, template_str = self._padSequences(str(target_seq), template.sequence) template.sequence = template_str ali_name = self.getPrimeTemplateName(suffix=suffix) ali = prime_wrapper.BasePrimeAlignment(ali_name, target_str) ali.setTemplate(template) return ali
[docs] def writeInputForPrime(self, taskdir, suffix=None): """ Convert the sequences into prime format and write them to file. :param suffix: Optional suffix for alignment name (needed if more than one target uses the same template) :type suffix: int or NoneType :return: Prime alignment :rtype: schrodinger.application.prime.wrapper.BasePrimeAlignment """ ali = self._getPrimeAlignment(suffix=suffix) ali.setWorkingDirectory(taskdir) ali.writeInputForPrime() return ali
[docs] def writeInputForConsensus(self, file_name, append=False): """ Convert the sequences into prime format and write them to file. :param file_name: Alignment file name to write (absolute path) :type file_name: str :param append: Whether to create a new alignment file or append :type append: bool :return: Prime alignment :rtype: schrodinger.application.prime.wrapper.BasePrimeAlignment """ ali = self._getPrimeAlignment() taskdir, file_name = os.path.split(file_name) ali.setWorkingDirectory(taskdir) ali.writeInputForConsensus(file_name=file_name, append=append) return ali
[docs] def getSelectedLigandsForPair(self, all_selected_ligands): """ Get the names of the selected ligands belonging to this pair :param all_selected_ligands: Selected ligands across all pairs :type all_selected_ligands: collections.abc.iterable(Ligand) :rtype: list[Ligand] """ return sorted( (lig for lig in all_selected_ligands if lig in self.ligands), key=lambda l: (l.chain, l.resnum))
def _onTemplateSeqChanged(self): """ Cache PrimeTemplate for new template seq and update ligand list """ self.updateIdentity() if self.template_seq is None or not self.template_seq.hasStructure(): return self._prime_template = self._extractTemplate() self._has_waters = bool(self._prime_template.waters) st = self._prime_template.st.copy() delete_zobs(st) all_ligands = self._getLigands(st) if all_ligands: # Prime ligands includes metals and other non-ligand hets true_ligands = analyze.find_ligands(st) true_ligand_pdbres = {lig.pdbres for lig in true_ligands} ligand_names = set() ligands = [] for lig in all_ligands: ai = lig.atom_indexes[0] name = make_ligand_name_atom(st, ai) if name in ligand_names: continue ligand_names.add(name) lig_atom = st.atom[ai] ligand = Ligand(name=name, pdbres=lig_atom.pdbres, chain=lig_atom.chain, resnum=lig_atom.resnum, asl=lig.ligand_asl, molnum=lig_atom.molecule_number_by_entry, structure_name=self._prime_template.name, cofactor=lig.pdbres not in true_ligand_pdbres) ligands.append(ligand) for ligand in self._getNucleicAcidLigands(st, known_hets=ligand_names): ligand.structure_name = self._prime_template.name ligands.append(ligand) self.ligands = ligands def _getLigands(self, st): return analyze.find_ligands(st, **self._prime_template.FIND_LIGANDS_KWARGS) def _getNucleicAcidLigands(self, st, known_hets=None): if known_hets is None: known_hets = set() atoms = analyze.evaluate_asl(st, "nucleic_acids") for ai in atoms: name = make_ligand_name_atom(st, ai) if name in known_hets: continue known_hets.add(name) lig_atom = st.atom[ai] ligand = Ligand( name=name, pdbres=lig_atom.pdbres, chain=lig_atom.chain, resnum=lig_atom.resnum, molnum=lig_atom.molecule_number_by_entry, ) yield ligand def _extractTemplate(self): """ :return: Prime wrapper template object :rtype: prime_wrapper.PrimeTemplate """ template_seq = self.template_seq seq_name = template_seq.name.strip() or "template" seq_name = seq_name.replace(" ", "_") template = prime_wrapper.PrimeTemplate(seq_name) template.sequence = str(template_seq) chain_id = template_seq.structure_chain full_st = template_seq.getStructure() protein_asl = PROTEIN_CHAIN_ASL.format(chain_id) protein_atoms = analyze.evaluate_asl(full_st, protein_asl) if not protein_atoms: raise ValueError(f"No protein atoms with chain {chain_id} found.") protein_st = full_st.extract(protein_atoms) non_protein_atoms = analyze.evaluate_asl( full_st, NON_PROTEIN_ASL.format(protein=protein_asl)) non_protein_atoms = set(non_protein_atoms).difference(protein_atoms) if non_protein_atoms: non_protein_st = full_st.extract(non_protein_atoms) # Force protein atoms to be at the beginning (a single extract keeps # original atom ordering) protein_st.extend(non_protein_st) template.st = protein_st return template def _padSequences(self, *sequence_strings): max_len = max(map(len, sequence_strings)) return [mystr.ljust(max_len, "~") for mystr in sequence_strings]
[docs]class LigandDialogModel(parameters.CompoundParam): ligands: List[LigandMolecule] selected_ligands: List[LigandMolecule] has_waters: bool = False include_waters: bool = False constrain_ligand: bool = False ligand_constraints: List[LigandConstraint] pick_ligand: bool = False
[docs] @classmethod def getJsonBlacklist(cls): """ @overrides: tasks.AbstractCmdTask """ return [ cls.pick_ligand, ]
[docs] def getSelectedLigands(self): return list( itertools.chain.from_iterable( ligmol.source_ligands for ligmol in self.selected_ligands))
[docs]class PrimeSettings(parameters.CompoundParam): """ Homology settings that are written to the prime input file """ prime_method: constants.PrimeMethod build_deletions: bool = True build_insertions: bool = True build_transitions: bool = True keep_rotamers: bool = True max_insertion_size: int = 20 minimize: bool = True num_output_struct: int = 1 side_opt: bool = True template_residue_numbers: bool = False
[docs] def getSummaryText(self): """ Get a string summarizing the settings. Prime method and the number of output structures will be shown; all other options will be grouped under custom or default. """ method_kw = "prime_method" num_kw = "num_output_struct" primary_keywords = (method_kw, num_kw) current_settings = self.toDict() method_text = current_settings.pop(method_kw).value num_models = current_settings.pop(num_kw) model_text = inflect.engine().no("model", num_models) # Only show "custom options" for secondary options default_settings = type(self).defaultValue().toDict() for kw in primary_keywords: del default_settings[kw] secondary_default = current_settings == default_settings default_text = "defaults" if secondary_default else "custom options" return f"{method_text}, {model_text}, {default_text}"
[docs] def initConcrete(self): super().initConcrete() self.prime_methodChanged.connect(self._onPrimeMethodChanged)
def _onPrimeMethodChanged(self, prime_method): """ Reset num_output_struct for energy-based modeling """ if prime_method is constants.PrimeMethod.ENERGY: M = self.getAbstractParam() self.num_output_struct = M.num_output_struct.defaultValue()
[docs]class HomologyModelingSettings(parameters.CompoundParam): prime_settings: PrimeSettings # Other settings include_hets: bool = True add_residue_labels: bool = False ligand_dlg_model: LigandDialogModel set_constraints: bool = False proximity_constraints: List pick_proximity: bool = False
[docs] def initConcrete(self): super().initConcrete() self.pick_proximityChanged.connect(self._onPickChanged) self.set_constraintsChanged.connect(self._onSetConstraintsChanged)
[docs] def reset(self, *args): if not args: # by default, don't reset ligand dialog model cls = type(self) args = [ subparam for _, subparam in cls.getSubParams().items() if not isinstance(subparam, LigandDialogModel) ] super().reset(*args)
def _onPickChanged(self, pick): """ When enabling picking, also enable set constraints """ if pick: self.set_constraints = True def _onSetConstraintsChanged(self, set_constraints): """ When disabling set constraints, also disable picking """ if not set_constraints: self.pick_proximity = False
[docs] @classmethod def getJsonBlacklist(cls): """ @overrides: tasks.AbstractCmdTask """ return [ cls.pick_proximity, ]
[docs] @json.adapter(version=49009) def adapter49009(cls, json_dict): json_dict.pop("refine_loops") return json_dict
[docs] @json.adapter(version=51105) def adapter51105(cls, json_dict): # commit 48aeafc1e7dfa4b9579cb34b5838c1e4d66555ea removed # `has_constraints` and moved `set_constraints` and `pick_proximity` # from PrimeSettings to HomologyModelingSettings prime_settings = json_dict["prime_settings"] prime_settings.pop("has_constraints", None) for key in ("set_constraints", "pick_proximity"): json_dict[key] = prime_settings.pop(key, False) return json_dict
[docs]class HomologyModelingInput(parameters.CompoundParam): mode: constants.Mode heteromultimer_mode: constants.HeteromultimerMode target_template_pairs: List[TargetTemplatePair] composite_regions: list pick_chimera: bool = False ready: bool = False settings: HomologyModelingSettings composite_residues = parameters.NonParamAttribute()
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._aln = None # single-shot timer for multiple Changed signals to only update once self._aln_changed_timer = QtCore.QTimer() self._aln_changed_timer.setSingleShot(True) self._aln_changed_timer.setInterval(0) self._aln_changed_timer.timeout.connect(self._updatePairs) self.modeChanged.connect(self._onModeChanged) self.heteromultimer_modeChanged.connect(self._updatePairs) self.composite_regionsChanged.connect(self._onCompositeRegionsChanged) # TODO MSV-2541 more sophisticated invalidation of chimeric regions # when pairs change self.target_template_pairsChanged.connect( # TODO PANEL-14101: after concrete listparams can truncate args, # remove this lambda lambda: self.composite_regions.clear())
[docs] @classmethod def getJsonBlacklist(cls): """ @overrides: tasks.AbstractCmdTask """ return [ cls.pick_chimera, ]
[docs] @json.adapter(version=51082) def adapter51082(cls, json_dict): # commit e156abb4760d4eed5fd8920bfd3d045ae57d23f3 moved # `ligand_dlg_model` from HomologyModelingInput to # HomologyModelingSettings lig_key = 'ligand_dlg_model' json_dict['settings'][lig_key] = json_dict.pop(lig_key) return json_dict
[docs] def getMinAlignmentQuality(self): """ :return: The minimum alignment quality of all alignment pairs or None if no alignment pairs exist :rtype: constants.AlignmentQuality or NoneType """ return get_min_alignment_quality(self.target_template_pairs)
[docs] def hasGaps(self): """ :return: Whether any of the pairs have gaps :rtype: bool """ for pair in self.target_template_pairs: if pair.target_seq.getGaps(): return True if pair.template_seq.getGaps(): return True return False
[docs] def getNumRegions(self): """ :return: The number of chimeric regions :rtype: int """ return len(self.composite_regions)
[docs] def getCompositeArray(self): """ :return: Composite array specifying 1-based template number for each target position and the pairs used in the array :rtype: str, list[TargetTemplatePair] """ pair = self.target_template_pairs[0] target_seq = pair.target_seq def get_template_number(res): seq = res.sequence for idx, pair in enumerate(self.target_template_pairs): if seq is pair.template_seq: return str(idx + 1) composites = ['1' for _ in target_seq.residues()] for region in self.composite_regions: template_number = None for res in region: if template_number is None: template_number = get_template_number(res) absolute_column_idx = res.idx_in_seq target_res = target_seq[absolute_column_idx] if target_res.is_gap: continue target_column_idx = target_res.gapless_idx_in_seq composites[target_column_idx] = template_number used_pairs = [] used_template_numbers = set(composites) used_template_numbers.add('1') offset = 0 composite_str = ''.join(composites) for idx, pair in enumerate(self.target_template_pairs): template_num = str(idx + 1) if template_num not in used_template_numbers: offset += 1 continue else: used_pairs.append(pair) if offset: new_num = str(int(template_num) - offset) composite_str = composite_str.replace(template_num, new_num) return composite_str, used_pairs
[docs] def updateHomologyStatus(self): """ Update aln homology status to reflect the current target template pairs """ aln = self._aln if aln is None: return aln.resetHomologyCache() for pair in self.target_template_pairs: target_seq = pair.target_seq if target_seq is not None: aln.setHomologyStatus(target_seq, HomologyStatus.Target) template_seq = pair.template_seq if template_seq is not None: aln.setHomologyStatus(template_seq, HomologyStatus.Template)
[docs] def getSeqsToAlign(self): """ Get sequences for sequence alignment """ seqs = set() for pair in self.target_template_pairs: seqs.add(pair.target_seq) seqs.add(pair.template_seq) seqs.discard(None) return seqs
[docs] def getSeqsToStructureAlign(self): """ Get sequences for structure alignment """ return [ pair.template_seq for pair in self.target_template_pairs if pair.template_seq is not None ]
[docs] def getLigandAndWaterSettings(self): """ Get all selected ligands and whether the Prime job should include water molecules. :return: A tuple of - A list of all selected ligands - Whether waters should be included :rtype: tuple(list[Ligand], bool) """ if not self.settings.include_hets: return ([], False) return (self.settings.ligand_dlg_model.getSelectedLigands(), self.settings.ligand_dlg_model.include_waters)
########################## # TARGET/TEMPLATE METHODS ########################## def _setAlignment(self, aln): if aln == self._aln: return if self._aln is not None: for signal, slot in self._getAlnSignalsAndSlots(self._aln): signal.disconnect(slot) if aln is not None: for signal, slot in self._getAlnSignalsAndSlots(aln): signal.connect(slot) self._aln = aln self._updatePairs() self._updateCompositeRegions() self._updateLigandConstraints() self._updateProximityConstraints() def _getAlnSignalsAndSlots(self, aln): aln_signals = aln.signals ss = [ (aln.signals.homologyCompositeResiduesChanged, self._updateCompositeRegions), (aln.signals.homologyLigandConstraintsChanged, self._updateLigandConstraints), (aln.signals.homologyProximityConstraintsChanged, self._updateProximityConstraints), (aln.signals.residuesAdded, self._fixChimericPick), (aln.signals.residuesRemoved, self._fixChimericPick), (aln.signals.sequenceResiduesChanged, self._fixChimericPick), ] # yapf: disable update_slot = self._aln_changed_timer.start update_signals = [ aln_signals.sequenceNameChanged, aln_signals.sequencesInserted, aln_signals.sequencesRemoved, aln_signals.sequencesReordered, aln_signals.sequenceResiduesChanged, aln_signals.alignmentNumColumnsChanged, aln_signals.residuesRemoved, aln_signals.residuesAdded, aln.seq_selection_model.selectionChanged, aln_signals.sequenceStructureChanged, ] ss.extend((signal, update_slot) for signal in update_signals) return ss def _updatePairs(self): """ Updates the panel model based on the alignment and the current mode. """ aln = self._aln if aln is None: return logger.debug("Updating the model based on the alignment and mode") ref_seq = aln.getReferenceSeq() if ref_seq is None: pairs = [] elif self._isOneOne(): # Use reference sequence as target # Use first structured seq as template target_seq = ref_seq template_seq = next(get_possible_templates(aln), None) pair = TargetTemplatePair(target_seq=target_seq, template_seq=template_seq) pairs = [pair] elif self.isChimera() or self.mode in (Mode.HOMOMULTIMER, Mode.CONSENSUS): # Use reference sequence as target # Use all subsequent structured seqs as template pairs = [] target_seq = ref_seq for struct_seq in get_possible_templates(aln): pairs.append( TargetTemplatePair(target_seq=target_seq, template_seq=struct_seq)) if (self.isChimera() and len(pairs) == constants.MAX_CHIMERIC_TEMPLATES): break if target_seq is not None and not pairs: pairs.append(TargetTemplatePair(target_seq=target_seq)) elif self.mode is Mode.MANY_ONE: # Use structured reference sequence as template # Use all (selected) subsequent sequences as targets if not ref_seq.hasStructure(): # Special case for unstructured reference seq pairs = [TargetTemplatePair(target_seq=ref_seq)] template_seq = None else: pairs = [] template_seq = ref_seq for target_seq in get_selected_nonref_seqs(aln): if target_seq.hasStructure(): continue pairs.append( TargetTemplatePair(target_seq=target_seq, template_seq=template_seq)) else: pairs = [] self.target_template_pairs = pairs self.updateHomologyStatus() self.ready = self._checkIfReady() self._updateIdentity() self._updateLigands() def _updateCompositeRegions(self): """ Transform composite residues into a list of composite regions. A region is a contiguous stretch of residues in a non-default template sequence. """ if self._aln is None: return composite_residues = self._aln.homology_composite_residues if not composite_residues: self.composite_residues = [] return pickable_seqs = list( picking.get_pickable_seqs(self._aln, PickMode.HMChimera)) if not pickable_seqs: return default_template = pickable_seqs[0] pickable_seqs = set(pickable_seqs) # Filter out residues not in pickable seqs residues = ( res for res in composite_residues if res.sequence in pickable_seqs) # Sort by column index residues = sorted(residues, key=lambda r: r.idx_in_seq) # Split into groups based on sequence changing seq_groups = itertools.groupby(residues, key=lambda r: r.sequence) # Listify and filter out residues in default template regions = [ list(group) for seq, group in seq_groups if seq != default_template ] self.composite_regions = regions def _onCompositeRegionsChanged(self): if self.mode is Mode.CHIMERA: self.ready = self._checkIfReady() def _fixChimericPick(self): aln = self._aln if not self.isChimera() or not aln.homology_composite_residues: return to_add, to_remove = picking.fix_chimeric_pick(aln) aln.updateHomologyCompositeResidues(to_add=to_add, to_remove=to_remove) def _updateLigandConstraints(self): if self._aln is None: return if not self._aln.hasHMLigandConstraints(): self.settings.ligand_dlg_model.ligand_constraints = [] return lig_constraints = [] for res, ligname in self._aln.hm_ligand_constraints: lig_constraints.append(LigandConstraint(res=res, ligname=ligname)) self.settings.ligand_dlg_model.ligand_constraints = lig_constraints def _updateProximityConstraints(self): if self._aln is None: return prox_constraints = self._aln.proximity_constraints self.settings.proximity_constraints = list(prox_constraints.getPairs()) def _checkIfReady(self): """ For the current homology modeling mode, return whether the input is ready (i.e. the task can be run). :return: Whether the input is ready :rtype: bool """ pairs = self.target_template_pairs ready = False if len(pairs) == 0: return False if self._isOneOne(): current_pair = pairs[0] ready = current_pair.is_valid else: ready = len(pairs) >= 2 and all(pair.is_valid for pair in pairs) if ready and self.isChimera(): ready = self.getNumRegions() > 0 return ready def _updateLigands(self): all_ligands = [] to_select = [] lig_model = self.settings.ligand_dlg_model has_waters = False seen_names = set() for pair in self.target_template_pairs: pair_ligmols = [] key = lambda pair: pair.molnum for _, lig_group in itertools.groupby(sorted(pair.ligands, key=key), key=key): lig_group = list(lig_group) structure_name = lig_group[0].structure_name name = "\n".join(lig.name for lig in lig_group) if name in seen_names: # One ligand can contact multiple protein chains but # only show it once continue seen_names.add(name) ligmol = LigandMolecule(name=name, structure_name=structure_name, source_ligands=lig_group) pair_ligmols.append(ligmol) all_ligands.extend(pair_ligmols) if not any(lig in lig_model.ligands for lig in pair_ligmols): # Only select new ligands to_select.extend(lig for lig in pair_ligmols if not any( source_lig.cofactor for source_lig in lig.source_ligands)) has_waters |= pair.has_waters if self.mode is Mode.MANY_ONE: # In many-one (batch) mode, all templates are identical so skip # the rest of the pairs break lig_model.has_waters = has_waters # update selected ligands to new ligand objects that are equal to # previously selected ligands new_selection = [ lig for lig in all_ligands if lig in lig_model.selected_ligands ] new_selection.extend(to_select) # Since we're replacing selected ligands anyway, clear selected ligands # before updating ligands to avoid view/model sync issues lig_model.selected_ligands.clear() # The next line works around MSV-3690/PANEL-19188 lig_model.ligands.clear() lig_model.ligands = all_ligands lig_model.selected_ligands = new_selection def _updateIdentity(self): """ Update the identity of the active alignment(s) """ for pair in self.target_template_pairs: pair.updateIdentity() def _onModeChanged(self, mode): self._updatePairs() if mode is Mode.HETEROMULTIMER: return # Set heteromultimer mode to Chimera when setting the mode to Chimera if mode is Mode.CHIMERA: heteromultimer_mode = HeteromultimerMode.CHIMERA else: M = type(self) heteromultimer_mode = M.heteromultimer_mode.defaultValue() self.heteromultimer_mode = heteromultimer_mode def _isOneOne(self): return self.mode is Mode.ONE_ONE or ( self.mode is Mode.HETEROMULTIMER and self.heteromultimer_mode is HeteromultimerMode.ONE_ONE)
[docs] def isChimera(self): return self.mode is Mode.CHIMERA or ( self.mode is Mode.HETEROMULTIMER and self.heteromultimer_mode is HeteromultimerMode.CHIMERA)
[docs] def usesLigands(self): """ Return whether the mode can use ligands (regardless of whether ligands are present) """ return self.mode is not Mode.CONSENSUS
def _get_hm_job_config(): return jobtasks.job_config_factory( default_incorp_mode=jobtasks.IncorporationMode.APPEND if maestro else jobtasks.IncorporationMode.IGNORE, viewname=VIEWNAME, supports_subjobs=True)
[docs]class HMLauncherTask(jobtasks._AbstractJobMixin, tasks.SignalTask): """ Task to launch homology modeling jobtasks. Inherits from `jobtasks._AbstractJobMixin` to get `jobtasks.is_jobtask` to return True, but does not run a job directly. """ job_config = _get_hm_job_config() # the subtask is a param, but we don't want it to be a subparam, so store # it as a non-param attribute subtask = parameters.NonParamAttribute() def __eq__(self, other): # Bypassing the AbstractTask implementation of `__eq__` because it # shallow copies both tasks, which was throwing a traceback. return super(object, self).__eq__(other) @tasks.preprocessor def _setUpSubtask(self): # Copy settings from parent task to child task self.subtask.specifyTaskDir(self.getTaskDir()) self.subtask.job_config.setValue(self.job_config) # Hook up subtask signals self.subtask.taskDone.connect(self.mainDone) self.subtask.taskFailed.connect(self.mainDone) # Preprocess subtask so it happens at the correct time self.subtask.runPreprocessing()
[docs] def setUpMain(self): self.subtask.start(skip_preprocessing=True)
[docs] def write(self, skip_preprocessing=False): if not skip_preprocessing: # Run launcher task preprocessing to set up and preprocess subtask self.runPreprocessing() # Delegate write to subtask self.subtask.write(skip_preprocessing=True)
[docs] def stop(self): # Delegate stop to subtask self.subtask.stop()
[docs]class BasePrimeTask(jobtasks.CmdJobTask): """ Task that runs prime on `self.input_files`. Caller is responsible for writing and setting `self.input_files`. """ input_files: List[tasks.TaskFile] output_file: tasks.TaskFile job_config = _get_hm_job_config()
[docs] def initConcrete(self): super().initConcrete() self.input_filesChanged.connect(self._updateOutputFiles)
def _updateOutputFiles(self): """ When input files change, update the output filename """ if not self.input_files: self.output_file = "" return input_filename = self.input_files[0] file_base = _get_file_base(input_filename) self.output_file = file_base + HOMOLOGY_OUT_SUFFIX
[docs] def makeCmd(self): """ @overrides: tasks.AbstractCmdTask """ return ['prime'] + self.input_files
[docs]class BaseHomologyTaskMixin(parameters.CompoundParamMixin): """ Mixin with shared methods for all homology modeling tasks Subclasses must inherit from a Task class and define the appropriate task methods (e.g. makeCmd/mainFunction, preprocessors). """ aln = parameters.NonParamAttribute() input: HomologyModelingInput output_file: tasks.TaskFile job_id: str ENTRY_TITLE_FMT = "Model of {target} based on {templates}"
[docs] def initializeValue(self): super().initializeValue() self.name = "homology"
[docs] def initConcrete(self): super().initConcrete() self.aln = None
[docs] def getOutputStructureFiles(self): """ Get the task's output structure file(s). """ return [self.getTaskFilename(self.output_file)]
def _getModelName(self): tt_pairs = self.input.target_template_pairs target = get_seq_display_name(tt_pairs[0].target_seq) templates = [ get_seq_display_name(pair.template_seq) for pair in tt_pairs ] templates = ", ".join(templates) return self.ENTRY_TITLE_FMT.format(target=target, templates=templates)
[docs]class HomologyTaskMixin(BaseHomologyTaskMixin): """ Mixin with shared methods for simple homology modeling tasks. Subclasses must inherit from a JobTask class and define the appropriate task methods (e.g. makeCmd/mainFunction, preprocessors). :cvar ALLOW_DUPLICATE_LIGANDS: Whether to allow duplicate ligands. Should be False if all of the configs will be used to create one multimer. :vartype ALLOW_DUPLICATE_LIGANDS: bool """ file_prefix = parameters.NonParamAttribute() ALLOW_DUPLICATE_LIGANDS = False
[docs] def initConcrete(self): super().initConcrete() self.file_prefix = ""
def _getPrimeLogFilename(self, input_filename): file_base = _get_file_base(input_filename) log_file = file_base + ".log" return self.getTaskFilename(log_file) def _checkLogFile(self, input_filename): """ Parse the log file because the backend returns 0 when it fails """ log_file = self._getPrimeLogFilename(input_filename) with open(log_file) as fh: for line in fh: if "Homology model building job complete" in line: break else: raise tasks.TaskFailure( "Homology model building did not complete") ########################## # TASK METHODS ########################## def _onJobStarted(self, job): """ @overrides: _AbstractJobMixin """ super()._onJobStarted(job) if (maestro and mmutil.feature_flag_is_enabled(mmutil.NEW_INCORPORATION_INFRA)): jmgr = jobhub.get_job_manager() jmgr.setJobCompletionOptions(job.JobId, VIEWNAME, job.Project) def _getInputFilename(self, idx=None): file_name = self.file_prefix + self.name if idx: file_name = f"{file_name}_{idx}" return file_name + HOMOLOGY_INP_EXT def _writeInput(self): """ Generate a separate input file for each target-template pair. :return: File names (relative path) for all generated input files. :rtype: list[str] """ input_files = [] prime_configs = self._getPrimeConfigs() for i, cur_config in enumerate(prime_configs): file_name = self._getInputFilename(i) file_path = self.getTaskFilename(file_name) with open(file_path, 'w') as fh: cur_config.writeInputFile(fh, ignore_none=True, yesno=False) input_files.append(file_name) return input_files def _getPrimeConfigs(self): """ Create a Prime config object for each target-template pair. :return: All Prime config objects. :rtype: list[prime_wrapper.PrimeConfig] """ tt_pairs = self.input.target_template_pairs taskdir = self.getTaskDir() all_configs = [] seen_ligs = set() for template_number, pair in enumerate(tt_pairs, start=1): prime_config = self._getPrimeConfigWithSettings() self._addPairToConfig(prime_config, pair, taskdir, template_number) if not self.ALLOW_DUPLICATE_LIGANDS: # Remove ligands that were used in a previous config to_remove = set() for key, value in prime_config.items(): if "HETERO" in key: if value in seen_ligs: to_remove.add(key) seen_ligs.add(value) for key in to_remove: del prime_config[key] all_configs.append(prime_config) return all_configs def _getPrimeConfigWithSettings(self): """ Create a Prime config object with the current settings but no alignment loaded. :return: The Prime config object. :rtype: prime_wrapper.PrimeConfig """ prime_config = prime_wrapper.PrimeConfig() InputConfigAdapter.convert(self.input.settings.prime_settings, prime_config) return prime_config def _addPairToConfig(self, prime_config, pair, taskdir, template_number, suffix=None): """ Add a target-template pair to a Prime config object and write out the target-template alignment in Prime format. :param prime_config: The Prime config object to add the pair to. :type prime_config: prime_wrapper.PrimeConfig :param pair: The target-template pair to add. :type pair: TargetTemplatePair :param taskdir: The directory to write the Prime alignment to. :type taskdir: str :param template_number: The index of the target-template pair. :type template_number: int :param suffix: Suffix for prime alignment outfile name :type suffix: str """ settings = self.input.settings is_energy = (settings.prime_settings.prime_method is constants.PrimeMethod.ENERGY) selected_ligands, include_waters = self.input.getLigandAndWaterSettings( ) prime_alignment = pair.writeInputForPrime(taskdir, suffix=suffix) prime_config.setAlignment(prime_alignment, (prime_wrapper.NO_LIGANDS,), include_waters, template_number=template_number) crosslinks = [] if settings.include_hets and selected_ligands: ligands = pair.getSelectedLigandsForPair(selected_ligands) template_name = prime_config['TEMPLATE_NAME'] if include_waters: start_idx = sum(1 for key in prime_config if "HETERO" in key) else: start_idx = 0 lig_name_index_map = {} for idx, lig in enumerate(ligands, start=start_idx): lig_name_index_map[lig.name] = idx het_name = f'{template_name}_HETERO_{idx}' lig_id = f'LIG{lig.pdbres}{lig.chain}:{lig.resnum}' prime_config[het_name] = lig_id if is_energy and settings.ligand_dlg_model.constrain_ligand: lig_constraints = settings.ligand_dlg_model.ligand_constraints for constr in lig_constraints: lig_idx = lig_name_index_map[constr.ligname] # The backend takes ligand indexes starting at 500 lig_id = f"Z:{500 + lig_idx}" res_idx = constr.res.gapless_idx_in_seq res_id = f"A:{res_idx + 1}" crosslinks.append(f"{lig_id} {res_id}") if is_energy and settings.set_constraints: for res1, res2 in settings.proximity_constraints: ridx1 = res1.gapless_idx_in_seq + 1 ridx2 = res2.gapless_idx_in_seq + 1 crosslinks.append(f"A:{ridx1} A:{ridx2}") if is_energy and crosslinks: prime_config['CROSSLINK'] = " ".join(crosslinks) return prime_alignment
def _check_unique_template_names(pairs): """ Return whether all pairs' templates are unique by name and chain """ names_and_chains = [ (pair.template_seq.name, pair.template_seq.chain) for pair in pairs ] valid = len(set(names_and_chains)) == len(names_and_chains) if valid: return True msg = "Templates must have unique names and chains. Non-unique items: " non_unique = [ item for item, num in collections.Counter(names_and_chains).items() if num > 1 ] msg += ", ".join(["_".join(parts) for parts in non_unique]) return False, msg
[docs]class HomologyModelingTask(HomologyTaskMixin, BasePrimeTask): @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def _checkUniqueTemplateNames(self): return _check_unique_template_names(self.input.target_template_pairs) @tasks.preprocessor(order=tasks.AFTER_TASKDIR) def _prepareInputAndOutput(self): """ Generate the input files and expected output file names. """ input_files = self._writeInput() self.input_files = input_files def _getPrimeConfigWithSettings(self): prime_config = super()._getPrimeConfigWithSettings() prime_config[ENTRY_TITLE_KEY] = self._getModelName() return prime_config @tasks.postprocessor def _checkFirstLogFile(self): self._checkLogFile(self.input_files[0])
[docs]class ChimeraHomologyModelingTask(HomologyModelingTask): ENTRY_TITLE_FMT = "Chimeric model of {target} based on {templates}" def _writeInput(self): """ Generate the input file for chimera mode. @overrides: HomologyTaskMixin :return: File names (relative path) for all generated input files. :rtype: list[str] """ prime_configs = self._getPrimeConfigs() file_name = self._getInputFilename() file_path = self.getTaskFilename(file_name) with open(file_path, 'w') as fh: for cur_config in prime_configs: cur_config.writeInputFile(fh, ignore_none=True, yesno=False) return [file_name] def _getPrimeConfigs(self): """ Create Prime config objects for chimera mode. These config objects should all be written to a single input file. @overrides: HomologyTaskMixin :return: All Prime config objects. :rtype: list[prime_wrapper.PrimeConfig] """ main_prime_config = self._getPrimeConfigWithSettings() composite_array, tt_pairs = self.input.getCompositeArray() main_prime_config['COMPOSITE_ARRAY'] = composite_array all_configs = [] all_configs.append(main_prime_config) taskdir = self.getTaskDir() for template_number, pair in enumerate(tt_pairs, start=1): prime_config = EmptyPrimeConfig() self._addPairToConfig(prime_config, pair, taskdir, template_number) all_configs.append(prime_config) return all_configs
[docs]class HeteromultimerHomologyModelingTask(BaseHomologyTaskMixin, BasePrimeTask):
[docs] class Input(parameters.CompoundParam): settings: HomologyModelingSettings input_list: List[HomologyModelingInput]
@tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def _checkUniqueTemplateNames(self): pairs = (pair for tab_input in self.input.input_list for pair in tab_input.target_template_pairs) return _check_unique_template_names(pairs) @tasks.preprocessor(order=tasks.AFTER_TASKDIR) def _prepareInputAndOutput(self): taskdir = self.getTaskDir() for idx, hm_input in enumerate(self.input.input_list): TaskCls = get_task_class(hm_input.heteromultimer_mode) subtask = TaskCls() if idx > 0: subtask.file_prefix = f"tab{idx}_" subtask.name = self.name subtask.input.setValue(hm_input) subtask.specifyTaskDir(taskdir) subtask.runPreprocessing() self.input_files.extend(subtask.input_files)
[docs]class BatchHomologyModelingTask(HomologyTaskMixin, jobtasks.ComboJobTask): job_config = _get_hm_job_config()
[docs] class Input(HomologyModelingInput): """ :ivar prime_input_files: List of prime .inp files """ prime_input_files: List[jobtasks.TaskFile] misc_input_files: List[jobtasks.TaskFile]
[docs] class Output(jobtasks.JobOutput): pass
ALLOW_DUPLICATE_LIGANDS = True ENTRY_TITLE_FMT = "Model of {target} based on {template}"
[docs] def initializeValue(self): super().initializeValue() self.name = "batch_homology"
[docs] def getOutputStructureFiles(self): """ @overrides: HomologyTaskMixin """ return [self.getTaskFilename(self.output.incorporation_file)]
@tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def _checkName(self): # The name "homology" is used for the subjobs, so prevent trying to # create homology.log for both parent and child task if self.name == "homology": return False, 'Batch task cannot be named "homology"' @tasks.preprocessor(order=tasks.AFTER_TASKDIR) def _prepareInputAndOutput(self): """ Generate the input files """ # Temporarily set name to homology to write subjob input files old_name = self.name self.name = "homology" try: # Note: `_writeInput` calls `_addPairToConfig` which # adds the aln and pdb files to `self.input_files` input_files = self._writeInput() finally: self.name = old_name input_files = [self.getTaskFilename(fn) for fn in input_files] # Store the prime input files separately self.input.prime_input_files = input_files
[docs] def mainFunction(self): subtasks = [] for in_fn in self.input.prime_input_files: subtask = BasePrimeTask() # Launch subtasks from the parent task jobdir (cwd) # because that's where all the prime input files are subtask.specifyTaskDir(None) subtask.input_files = [in_fn] subtasks.append(subtask) queue.autoname_tasks(subtasks) dj = queue.TaskDJ() for task in subtasks: dj.addTask(task) dj.run() queue.auto_register_files(subtasks, auto_file_mode=queue.AutoFileMode.ALL) # TODO after PANEL-19618, remove the next block for task in subtasks: if task.job_id: job_obj = jobcontrol.Job(task.job_id) jobcontrol.register_job_output(job_obj) succeeded_tasks = [] for task in subtasks: if task.status is task.DONE: succeeded_tasks.append(task) self.output.incorporation_file = "batch_homology_out.maegz" with structure.StructureWriter( self.output.incorporation_file) as writer: for task in succeeded_tasks: # Because the subtasks run in cwd, the relative output file # path can be used directly with structure.StructureReader(task.output_file) as reader: writer.extend(reader) num_done = len(succeeded_tasks) num_started = len(subtasks) if num_done < num_started: raise tasks.TaskFailure(f"{num_done} of {num_started} homology " "models successfully completed")
def _addPairToConfig(self, prime_config, pair, taskdir, template_number): """ @overrides: HomologyTaskMixin """ # many-one reuses the same template, so name and chain aren't # sufficient to identify suffix = template_number - 1 template_number = 1 prime_alignment = super()._addPairToConfig(prime_config, pair, taskdir, template_number, suffix=suffix) # Add aln and pdb files to input files so they're copied to the job dir self.input.misc_input_files.append( self.getTaskFilename(prime_alignment.getAlnFilename())) self.input.misc_input_files.append( self.getTaskFilename(prime_alignment.template.file_name)) entry_title = self.ENTRY_TITLE_FMT.format( target=get_seq_display_name(pair.target_seq), template=get_seq_display_name(pair.template_seq)) prime_config[ENTRY_TITLE_KEY] = entry_title @tasks.postprocessor def _checkFirstLogFile(self): self._checkLogFile(self.input.prime_input_files[0])
[docs]class ConsensusHomologyModelingTask(BaseHomologyTaskMixin, jobtasks.CmdJobTask): input_file: tasks.TaskFile job_config = _get_hm_job_config() @tasks.preprocessor def _writeInput(self): if not self.input.target_template_pairs: return self.input_file = self.name + HOMOLOGY_INP_EXT # Consensus output is maegz self.output_file = _get_file_base(self.input_file) + "-out.maegz" prime_config = prime_wrapper.ConsensusConfig() first_ali_name = self.input.target_template_pairs[ 0].getPrimeTemplateName() file_name = self.getTaskFilename(first_ali_name + '.aln') for idx, pair in enumerate(self.input.target_template_pairs): do_append = bool(idx) prime_alignment = pair.writeInputForConsensus(file_name=file_name, append=do_append) if idx == 0: prime_config.setAlignment(prime_alignment) model_name = self._getModelName() # TODO PRIME-4731: consensus homology doesn't support whitespace model_name = model_name.replace(" ", "_") prime_config[ENTRY_TITLE_KEY] = model_name with open(self.getTaskFilename(self.input_file), 'w') as fh: prime_config.writeInputFile(fh, ignore_none=True, yesno=False)
[docs] def makeCmd(self): """ @overrides: tasks.AbstractCmdTask """ return ['consensus_homology', self.input_file]
[docs]def get_possible_templates(aln): """ Generator for sequences from the given alignment that can be templates :rtype: collections.abc.Iterable[sequence.ProteinSequence] """ for seq in get_selected_nonref_seqs(aln): if seq.hasStructure(): yield seq
[docs]def get_selected_nonref_seqs(aln): """ Generator for non-reference sequences from the given alignment that are selected (or all non-reference seqs if there is no selection) :rtype: collections.abc.Iterable[sequence.ProteinSequence] """ selected_seqs = aln.seq_selection_model.getSelection() selected_seqs.discard(aln.getReferenceSeq()) for seq in itertools.islice(aln, 1, None): if type(seq) != sequence.ProteinSequence: # TODO MSV-1504 once NucleicAcidSequence doesn't inherit # ProteinSequence, change to isinstance # Skip nucleic acid sequences return if seq in selected_seqs or ( not selected_seqs and (not seq.hasStructure() or seq.getStructure().property.get(HOMOLOGY_PROP) is None)): yield seq
[docs]def get_min_alignment_quality(pairs): """ :param pairs: Target template pairs :type pairs: collections.abc.iterable[TargetTemplatePair] :return: The minimum alignment quality of all alignment pairs or None if no alignment pairs exist :rtype: constants.AlignmentQuality or NoneType """ qualities = (pair.alignment_quality for pair in pairs) return min((qual for qual in qualities if qual is not None), default=None)
[docs]def get_task_from_model(gui_model): current_input = gui_model.current_page.homology_modeling_input current_mode = current_input.mode if current_mode is Mode.HETEROMULTIMER: het_settings = gui_model.heteromultimer_settings task = HeteromultimerHomologyModelingTask() task.input.settings.setValue(het_settings.settings) for page in het_settings.selected_pages: # TODO MSV-3239 in standalone mode, deepcopying the sequences # causes the template sequences to lose the associated structure input_copy = copy.deepcopy(page.homology_modeling_input) task.input.input_list.append(input_copy) else: task = get_task(current_input) return task
[docs]def get_task(hm_input): """ Return a task for the specified input """ TaskCls = get_task_class(hm_input.mode) task = TaskCls() task.input.setValue(hm_input) return task
[docs]def get_task_class(mode): if mode in (Mode.ONE_ONE, Mode.HOMOMULTIMER, HeteromultimerMode.ONE_ONE): TaskCls = HomologyModelingTask elif mode in (Mode.CHIMERA, HeteromultimerMode.CHIMERA): TaskCls = ChimeraHomologyModelingTask elif mode is Mode.MANY_ONE: TaskCls = BatchHomologyModelingTask elif mode is Mode.CONSENSUS: TaskCls = ConsensusHomologyModelingTask else: raise ValueError( f"Task class has not been implemented for mode {str(mode)}") return TaskCls