Source code for schrodinger.protein.tasks.kinase

import csv
import os

from schrodinger import structure
from schrodinger.models import parameters
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.annotation import KinaseConservation
from schrodinger.protein.annotation import KinaseFeatureLabel
from schrodinger.structutils import analyze
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils import csv_unicode

try:
    from schrodinger.application.prime.packages import kinase_annotation
except ImportError:
    kinase_annotation = None


[docs]class KinaseFeatureTask(tasks.ComboSubprocessTask): DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
[docs] class Input(parameters.CompoundParam): seq: sequence.ProteinSequence = None
[docs] class Output(parameters.CompoundParam): annotation: list
BACKEND_SINGLE_LETTER_CODE_TO_KINASE_FEATURE = { 'G': KinaseFeatureLabel.GLYCINE_RICH_LOOP, 'A': KinaseFeatureLabel.ALPHA_C, 'K': KinaseFeatureLabel.GATE_KEEPER, 'H': KinaseFeatureLabel.HINGE, 'L': KinaseFeatureLabel.LINKER, 'R': KinaseFeatureLabel.HRD, 'C': KinaseFeatureLabel.CATALYTIC_LOOP, 'D': KinaseFeatureLabel.DFG, 'T': KinaseFeatureLabel.ACTIVATION_LOOP, '~': KinaseFeatureLabel.NO_FEATURE }
[docs] def mainFunction(self): seq = self.input.seq seq_str = str(seq).replace(seq.gap_char, "") seq_str = seq_str.upper() try: ret = kinase_annotation.KinaseAnnotation.process_annotation(seq_str) except RuntimeError: # Backend raises RuntimeError for non-kinase sequences return _gapless_sequence, backend_annotation, _annotation_head = ret self.output.annotation = backend_annotation
[docs] def getKinaseFeatures(self): backend_annotation = self.output.annotation if not backend_annotation: return {} # Create the annotation mapping with all gaps included from the # original sequence. annotation = [ self.BACKEND_SINGLE_LETTER_CODE_TO_KINASE_FEATURE[code] for code in backend_annotation ] gap_idxs = [ii for ii, res in enumerate(self.input.seq) if res.is_gap] for idx in gap_idxs: annotation.insert(idx, KinaseFeatureLabel.NO_FEATURE) kinase_feature_map = {} for res, feature in zip(self.input.seq, annotation): if res.is_gap: continue kinase_feature_map[res] = feature return kinase_feature_map
[docs]class KinaseConservationTask(jobtasks.CmdJobTask): """ Task to run kinase binding site conservation analysis on the specified sequence and ligand. """ DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
[docs] class Input(parameters.CompoundParam): seq: sequence.ProteinSequence = None ligand_asl: str
_receptor_fname = "receptor.maegz" out_fname = "receptor_conservation.csv" @tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def _checkInputs(self): if self.input.seq is None: return False, "Receptor sequence must be set" if not self.input.seq.hasStructure() or not self.input.seq.entry_id: return False, "Receptor sequence must have structure" if not self.input.ligand_asl: return False, "Ligand ASL must be set" st = self.input.seq.getStructure() if not analyze.evaluate_asl(st, self.input.ligand_asl): return False, f"Ligand {self.input.ligand_asl} was not found" @tasks.preprocessor(order=tasks.AFTER_TASKDIR) def _writeInputFile(self): receptor_st = self.input.seq.getStructure() receptor_st.write(self.getTaskFilename(self._receptor_fname))
[docs] def makeCmd(self): return [ '$SCHRODINGER/run', '-FROM', 'psp', 'kinase_conservation_analysis.py', self._receptor_fname, '-receptor_chain', self.input.seq.structure_chain, '-ligand_asl', self.input.ligand_asl ] # yapf: disable
def _getOutputFile(self): return self.getTaskFilename(self.out_fname) @tasks.postprocessor def _checkOutputFile(self): outfile = self._getOutputFile() if not os.path.isfile(outfile): return False, f"Output file not found: {outfile}"
[docs] def parseOutput(self): """ See parse_kinase_conservation for documentation. """ outfile = self._getOutputFile() seq = self.input.seq return parse_kinase_conservation(outfile, seq)
[docs]def parse_kinase_conservation(outfile: str, seq: sequence.ProteinSequence): """ Read the output file and associate the output with the correct residues. :rtype: dict[residue.Residue, annotation.KinaseConservation] """ _sres_id_key = 'ResidueID_in_ct(ChainID:ResidueNumberInscode)' _conservation_key = 'Conservation_category' st = seq.getStructure() results = dict() with csv_unicode.reader_open(outfile) as fh: for _ in range(4): # The first 4 lines are decorative next(fh) reader = csv.DictReader(fh) for row in reader: sres_id = row[_sres_id_key] try: sres = st.findResidue(sres_id) except ValueError: continue res_key = residue.get_structure_residue_key(sres, seq.entry_id) res = seq.getResByKey(res_key) if res is None: continue conservation = row[_conservation_key] conservation = KinaseConservation(conservation) results[res] = conservation return results
[docs]class KinaseFeatureAndConservationTask(KinaseFeatureTask): """ Task to compute both kinase features and kinase conservation """
[docs] class Input(parameters.CompoundParam): seq: sequence.ProteinSequence = None st: structure.Structure = None entry_id: int = None ligand_asl: str
@tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def _checkInputs(self): if self.input.seq is None: return False, "Receptor sequence must be set" if not self.input.seq.hasStructure() or not self.input.seq.entry_id: return False, "Receptor sequence must have structure" if not self.input.ligand_asl: return False, "Ligand ASL must be set" st = self.input.seq.getStructure() if not analyze.evaluate_asl(st, self.input.ligand_asl): return False, f"Ligand {self.input.ligand_asl} was not found" # seq.getStructure() and entry ID don't survive json so store self.input.st = st self.input.entry_id = self.input.seq.entry_id
[docs] def mainFunction(self): """ Compute kinase features if necessary, then compute kinase conservation if the sequence is a kinase """ seq = self.input.seq if seq.is_kinase_annotated: if not seq.isKinaseChain(): return else: # Compute kinase features super().mainFunction() if not self.output.annotation: # Not a kinase return cons_task = KinaseConservationTask() input_dict = self.input.toDict() st = input_dict.pop("st") seq._get_structure = lambda: st entry_id = input_dict.pop("entry_id") seq.entry_id = entry_id assert seq.hasStructure() assert seq.entry_id cons_task.input.setValue(input_dict) assert cons_task.input.seq.hasStructure() cons_task.specifyTaskDir(self.getTaskDir()) cons_task.start() cons_task.wait() # OK: subprocess if cons_task.status is cons_task.FAILED: self.failure_info = cons_task.failure_info
[docs] def parseConservationOutput(self): """ See parse_kinase_conservation for documentation. """ seq = self.input.seq outfile = self.getTaskFilename(KinaseConservationTask.out_fname) return parse_kinase_conservation(outfile, seq)