Source code for schrodinger.protein.tasks.pfam

import copy
import os

from schrodinger.application.msv import seqio
from schrodinger.models import parameters
from schrodinger.protein import alignment
from schrodinger.protein import sequence
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks


[docs]class PfamTask(jobtasks.CmdJobTask): DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR backend_name = 'pfam'
[docs] class Input(parameters.CompoundParam): seq: sequence.ProteinSequence = None
[docs] class Output(jobtasks.CmdJobTask.Output): pfam: str name: str
@tasks.preprocessor(order=tasks.AFTER_TASKDIR) def _createInputFasta(self): inp_file_name = self.getTaskFilename(self.name + '.fasta') seq = copy.deepcopy(self.input.seq) seq.removeAllGaps() aln = alignment.ProteinAlignment([seq]) seqio.FastaAlignmentWriter.write(aln, inp_file_name) @tasks.preprocessor(order=tasks.AFTER_TASKDIR) def _createJobParamsFile(self): job_file_name = self.getTaskFilename(self.name + '.inp') with open(job_file_name, 'w') as job_file: fasta_fname = self.getTaskFilename(self.name + '.fasta') lines = '\n'.join([ f'QUERY_FILE "{fasta_fname}"', 'FORMAT m2io' ]) # yapf: disable job_file.writelines(lines)
[docs] def makeCmd(self): """ @overrides: tasks.AbstractCmdTask """ return ['pfam', self.name]
@tasks.postprocessor def _incorporateResults(self): pfam_out_fname = self.getTaskFilename(self.name + '.out') if not os.path.isfile(pfam_out_fname): # The backend returns 0 even if there's no output return False, "No output produced" pfam, pfam_name = _extract_pfam_from_mmio_file(pfam_out_fname) self.output.pfam = pfam self.output.name = pfam_name
def _extract_pfam_from_mmio_file(mmio_fname): """ ### Below is copied from MSV1. It's kind of gross but using the ### m2io utilities doesn't make things all that much better. """ with open(mmio_fname, "r") as pfam_file: lines = pfam_file.readlines() pfam_string = "" seq_idx = 0 field_idx = 0 level = 0 fields = [] field_dict = {} for line in lines: if "m_psp_seq" in line: seq_idx += 1 continue if seq_idx == 2: if ":::" in line: level += 1 continue if level == 0: fields.append(line.strip()) elif level == 1: if field_idx < len(fields): field_dict[fields[field_idx]] = line.strip(' "\n') field_idx += 1 elif level == 2: codes = line.split() code = codes[1].replace('\"', '') if code == '': code = ' ' pfam_string += str(code) else: break return pfam_string, field_dict["s_psp_query_family_name"]