Source code for schrodinger.application.msv.seqio

import collections
import contextlib
import csv
import itertools
import os
import re
import warnings
from functools import partial

import requests
from Bio.Seq import Seq as BioSeq
from Bio.SeqRecord import SeqRecord as BioSeqRecord
from requests.exceptions import HTTPError
# We don't currently support SSL verification
from requests.packages.urllib3.exceptions import InsecureRequestWarning

from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.protein import alignment
from schrodinger.protein import getpdb_utility
from schrodinger.protein import residue
from schrodinger.protein import seqres
from schrodinger.protein import sequence
from schrodinger.protein.annotation import ProteinSequenceAnnotations as PSAnno
from schrodinger.structutils import analyze
from schrodinger.structutils.interactions.ssbond import get_disulfide_bonds
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

FASTA_COMMENT_CHARS = {'>', ';'}
PERMISSIVE_CHAIN_RE = re.compile(
    r"""
    ^(?P<name>\S+)     # Non-whitespace name
    [-.:_]             # Separator
    (?P<id>[A-Za-z])$  # Single letter chain
    """, re.VERBOSE)
UNIPROT_RE = re.compile(  # https://www.uniprot.org/help/accession_numbers
    r"^[OPQ][0-9][A-Z0-9]{3}[0-9]$|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}$",
    re.IGNORECASE)
SWISS_PROT_RE = re.compile(  # https://www.uniprot.org/help/entry_name
    r"[A-Z0-9]{1,5}_[A-Z0-9]{2,5}$", re.IGNORECASE)
HAS_DIGIT_RE = re.compile(r"\d")
# This dict is keyed by a string to be used in header strings.
EXPORT_ANNOTATIONS = {'|SSA': PSAnno.ANNOTATION_TYPES.secondary_structure}
NAME_FLAG = "NAME:"
CHAIN_FLAG = "CHAIN:"
NAME_STR = "%s{name}|%s{chain}" % (NAME_FLAG, CHAIN_FLAG)
SIM_STR = "|ID:{id:.2F}|SIM:{sim:.2f}|HOM:{hom:.2f}"
NCBI_FLAGS = {
    "gb", "emb", "dbj", "pir", "prf", "pdb", "pat", "bbs", "gnl", "ref", "lcl",
    'sp', 'Swiss-Prot'
}
FetchIDs = collections.namedtuple("FetchIDs", "pdb entrez uniprot")

INVALID_SSA_VAL = object()
SSA_MAP = {
    'C': structure.SS_NONE,
    'H': structure.SS_HELIX,
    'E': structure.SS_STRAND,
    '-': None,
}
SSA_MAP_RV = {v: k for k, v in SSA_MAP.items()}

SEQD_REQUIRED_COLUMNS = ('ResID', 'Chain', 'ResName')


[docs]class SequenceWarning(UserWarning): """ Custom warning for problems loading sequences """
[docs]class catch_sequence_warnings(contextlib.ExitStack): """ Filter SequenceWarnings and store them on the instance """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._caught_warnings = [] self.message = ""
def __enter__(self): self._caught_warnings = self.enter_context( warnings.catch_warnings(record=True)) return self def __exit__(self, exc_type, exc_value, traceback): super().__exit__(exc_type, exc_value, traceback) if exc_type is None: msg_parts = [] for cur_warning in self._caught_warnings: if isinstance(cur_warning.message, SequenceWarning): msg_parts.append(str(cur_warning.message)) else: warnings.showwarning(cur_warning.message, cur_warning.category, cur_warning.filename, cur_warning.lineno) self.message = "\n".join(msg_parts)
def _partition_by_predicate(arr, pred): """ Utility function to groups a list into lists. Each sublist begins with an element that matches the supplied predicate; or if none exist, the sublist will only contain elements that don't match the predicate. For example, partitioning by even values would map [1, 2, 3, 4, 5, 6] to [1], [2, 3], [4, 5], [6]. Note that [1] is yielded despite 1 not being an even number, because there was no even number before 1. Note also that many file reading functions below would benefit from using this function. :param arr: A list to split into sublists :type arr: list :param pred: A function that takes a list item and returns True if the list item meets a criteria and False otherwise :type pred: function """ sublist = [] for item in arr: if pred(item) and sublist: yield sublist sublist = [] sublist.append(item) if sublist: yield sublist ################################################################################ # Downloaders ################################################################################
[docs]class GetSequencesException(IOError): """ Custom Exception for problems retrieving sequences. """
PdbParts = collections.namedtuple("PdbParts", ("pdbcode", "pdbchain")) FastaParts = collections.namedtuple("FastaParts", ("name", "long_name", "chain", "anno_type"))
[docs]def make_maestro_pdb_id(pdb_id): """ Convert a PDB ID to ":"-separated PDB code and PDB chain (e.g. 4hhb if chain is blank or 4hhb:A) :param pdb_id: PDB ID with optional chain, e.g. 4hhb, 4hhbA, 4hhb:A, 4hhb_A :type pdb_id: str :return: PDB ID with ":" between PDB code and PDB chain :rtype: str """ pdbparts = parse_pdb_id(pdb_id) pdb_part_list = [pdbparts.pdbcode.lower()] if pdbparts.pdbchain != "": pdb_part_list.append(pdbparts.pdbchain) return ":".join(pdb_part_list)
[docs]def parse_pdb_id(pdb_id, permissive=False): """ Parse a PDB ID into a (pdb code, pdb chain) Named tuple. :param pdb_id: PDB ID with optional chain, e.g. 4hhb, 4hhbA, 4hhb:A, 4hhb_A :type pdb_id: str :param permissive: Whether to use permissive parsing. In strict mode, PDB ID must be 4 characters starting with a digit and single-letter chain is optional. In permissive mode, PDB ID can contain any non-whitespace characters but chain separator and single-letter chain are required. :type permissive: bool :return: Named tuple of (pdbcode, pdbchain) :type: PdbParts :raises GetSequencesException: if pdb_id can't be parsed """ try: return _parse_pdb_id(pdb_id) except GetSequencesException: if not permissive: raise else: match = PERMISSIVE_CHAIN_RE.match(pdb_id) if match: groupdict = match.groupdict() return PdbParts(groupdict["name"], groupdict["id"]) raise
def _parse_pdb_id(pdb_id): """ Parse a PDB ID into a (pdb code, pdb chain) Named tuple. :param pdb_id: PDB ID with optional chain, e.g. 4hhb, 4hhbA, 4hhb:A, 4hhb_A PDB code must be 4 characters starting with a digit. Chain must be blank or a single letter. :type pdb_id: str :return: Named tuple of (pdbcode, pdbchain) :type: PdbParts :raises GetSequencesException: if pdb_id can't be parsed """ pdb_id = pdb_id.strip() if not 4 <= len(pdb_id) <= 6: raise GetSequencesException(f"Wrong length PDB code: {pdb_id}") if not pdb_id[0].isdigit(): raise GetSequencesException( f"First character of a PDB ID must be a digit: {pdb_id}") if not re.match(r"^[0-9a-zA-Z]{3}$", pdb_id[1:4]): raise GetSequencesException( "Last 3 characters of a PDB ID must be digits or ASCII letters: " f"{pdb_id}") pdbparts = PdbParts(pdbcode=pdb_id[:4], pdbchain="") # A PDB code has length 4 if len(pdb_id) == 4: return pdbparts else: # If there are 5 or 6 characters, the last should be the chain ID pdbparts = pdbparts._replace(pdbchain=pdb_id[-1]) if not pdbparts.pdbchain.isalnum(): raise GetSequencesException("Invalid PDB chain: %s" % pdb_id) # With length 6, there should be a non-alnum separator elif len(pdb_id) == 6 and pdb_id[4].isalnum(): raise GetSequencesException("Invalid PDB code: %s" % pdb_id) return pdbparts
[docs]def get_valid_pdb_id_map_for_seqs(seqs, structureless_only=True): """ For a list of sequences return a map of valid PDB IDs to sequences. :param seqs: List of sequences to get the map for :type seqs: list(sequence.Sequence) :param structureless_only: Whether to only return structureless seqs :type structureless_only: bool :return: Map of valid PDB IDs to their source sequence :rtype: dict(str: sequence.Sequence) """ valid_pdb_id_map = {} for seq in seqs: if structureless_only and seq.hasStructure(): continue for candidate_id in [seq.pdb_id, seq.name, seq.long_name]: if candidate_id is None: continue try: pdbparts = parse_pdb_id(candidate_id, True) except GetSequencesException: continue candidate_id = pdbparts.pdbcode + pdbparts.pdbchain if len(candidate_id) == 4 and seq.chain: candidate_id += seq.chain valid_pdb_id_map[candidate_id] = seq break return valid_pdb_id_map
[docs]def valid_pdb_id(pdb_id: str) -> bool: """ :return: Whether the ID appears to be a valid PDB ID """ try: _parse_pdb_id(pdb_id) except GetSequencesException: return False return True
[docs]def valid_entrez_id(entrez_id: str) -> bool: """ Entrez ID may be: 1) NCBI Accession number: 9 or 12 characters starting with any letter, followed by `"P_"`, ending with 6 or 9 numbers and an optional number following a period (ex. NP_123456, XP_123456789.1) 2) NCBI GenInfo identifier: A single 9-digit number (ex. 123456789). :return: Whether the ID appears to be a valid Entrez ID """ entrez_id = entrez_id.strip() ch_id = '' if '.' in entrez_id: parts = entrez_id.split('.') if len(parts) != 2: # Too many '.' chars return False entrez_id, ch_id = parts if not ch_id.isdigit(): return False id_len = len(entrez_id) if id_len not in [9, 12]: return False if id_len == 9 and entrez_id.isdigit(): if ch_id: return False return True elif not entrez_id[0].isalpha(): return False elif not entrez_id[1:3].upper() == "P_": return False elif not entrez_id[3:].isdigit(): return False return True
[docs]def valid_uniprot_id(uniprot_id: str) -> bool: """ UniProt ID must be 6 characters or 10 characters starting with a letter :return: Whether the ID appears to be a valid UniProt ID """ uniprot_id = uniprot_id.strip() return bool(UNIPROT_RE.match(uniprot_id))
[docs]def valid_swiss_prot_name(swiss_prot_name: str) -> bool: """ Swiss-Prot entry name must be of the form X_Y, where X and Y are at most 5 alphanumeric characters and the underscore serves as a separator. We also require Y to be a minimum of 2 characters to avoid confusion with a PDB ID. :return: Whether the name appears to be a valid Swiss-Prot entry name """ swiss_prot_name = swiss_prot_name.strip() return bool(SWISS_PROT_RE.match(swiss_prot_name))
[docs]def process_fetch_ids(ids, *, dialog_parent, allow_pdb=True): """ Convenience method to parse a list or comma-separated strings into valid sequence and/or structure identifiers. If any IDs can't be identified, prompt the user to continue. :param ids: Database ID or IDs (comma-separated str or list) :type ids: str or list :param dialog_parent: Parent to show dialog box :type dialog_parent: QtWidgets.QWidget :param allow_pdb: Whether to allow structure identifiers. If False, they will be treated as unidentified. :type allow_pdb: bool :return: Namedtuple of IDs identified as PDB, entrez, uniprot; or None if there are unidentified IDs and the user cancels. :rtype: FetchIDs or NoneType """ error_ids = [] if isinstance(ids, str): ids = ids.split(",") ids = [word.strip() for word in ids] pdb_like_ids = [] entrez_like_ids = [] uniprot_like_ids = [] for db_id in ids: if valid_entrez_id(db_id): entrez_like_ids.append(db_id) elif (valid_uniprot_id(db_id) or valid_swiss_prot_name(db_id)): uniprot_like_ids.append(db_id) elif allow_pdb and valid_pdb_id(db_id): pdb_like_ids.append(db_id) else: error_ids.append(db_id) if error_ids: from schrodinger.ui.qt import messagebox ok = messagebox.show_question( dialog_parent, "Some of the entered codes are invalid: " f"{', '.join(error_ids)}. " "Only the valid codes will be fetched. Continue?", title="Fetch") if not ok: return None return FetchIDs(pdb_like_ids, entrez_like_ids, uniprot_like_ids)
[docs]def maestro_get_pdb(maestro_pdb_id, pdb_dir=None, remote_ok=False): """ Download a PDB file. If specified, the chain will be split out into a separate file. :param maestro_pdb_id: 4-letter PDB code or code:chain (e.g. 4hhb or 4hhb:A) :type maestro_pdb_id: str :param pdb_dir: directory to check for existing files and destination to download new files :type pdb_dir: str :param remote_ok: whether it's okay to make a remote query. :type remote_ok: bool :return: downloaded PDB path :rtype: str :raises GetSequencesException: if pdb file can't be downloaded """ file_pdb_id = maestro_pdb_id.replace(":", "_") if pdb_dir is None: pdb_dir = os.getcwd() pdb_path = os.path.join(pdb_dir, "%s.pdb" % file_pdb_id) if not os.path.isfile(pdb_path): with fileutils.chdir(pdb_dir): args = [maestro_pdb_id] if not remote_ok: args.append('-l') error_code = getpdb_utility.main(args) if error_code: # Remove downloaded file with error fileutils.force_remove(pdb_path) raise GetSequencesException("An error occurred downloading %s" % file_pdb_id) if not os.path.isfile(pdb_path): raise GetSequencesException("File not downloaded: %s" % pdb_path) return pdb_path
[docs]class SeqDownloader(object): ENTREZ_FORMAT_STR = ( "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?" + "db=protein&rettype=fasta&id={ID}") UNIPROT_FORMAT_STR = "https://www.uniprot.org/uniprot/{ID}.{EXT}"
[docs] @classmethod def downloadPDB(cls, pdb_id, pdb_dir=None, remote_ok=False): """ Parse PDB ID string and download PDB file. :param pdb_id: PDB ID with optional chain (e.g. 4hhb, 4hhbA, 4hhb:A) :type pdb_id: str :param pdb_dir: directory to check for existing files and destination to download new files :type pdb_dir: str :param remote_ok: whether it's okay to make a remote query. :type remote_ok: bool :return: Full path to downloaded PDB path :type: str :raises GetSequencesException: if pdb file can't be downloaded """ maestro_pdb_id = make_maestro_pdb_id(pdb_id) return maestro_get_pdb(maestro_pdb_id, pdb_dir=pdb_dir, remote_ok=remote_ok)
[docs] @classmethod def downloadEntrezSeq(cls, sequence_id, remote_ok): """ Download a sequence from Entrez database. :param sequence_id: Sequence ID in Entrez format. :type sequence_id: str :param remote_ok: whether it's okay to make a remote query. :type remote_ok: bool :return: Full path to downloaded fasta file :rtype: str """ fasta_path = cls._downloadSeqFromUrl(cls.ENTREZ_FORMAT_STR, sequence_id, remote_ok) return fasta_path
[docs] @classmethod def downloadUniprotSeq(cls, sequence_id, remote_ok, *, use_xml=False): """ Download a sequence from Uniprot database. :param sequence_id: Sequence ID in Uniprot format. :type sequence_id: str :param remote_ok: whether it's okay to make a remote query. :type remote_ok: bool :param use_xml: whether to get the xml file with the full UniProt annotation information (e.g. domains). Setting this to True with download the xml file *instead* of the FASTA file. :type use_xml: bool :return: Full path to downloaded fasta or xml file :rtype: str """ ext = "xml" if use_xml else "fasta" path = cls._downloadSeqFromUrl(cls.UNIPROT_FORMAT_STR, sequence_id, remote_ok, ext=ext) return path
@staticmethod def _downloadSeqFromUrl(url_format_str, sequence_id, remote_ok, *, ext="fasta"): """ Downloads a sequence using a url_format_str and a seq ID :param url_format_str: Server url format string :type url_format_str: str :param sequence_id: The sequence ID based on what DB we're req'ing from :type sequence_id: str :param remote_ok: whether it's okay to make a remote query. :type remote_ok: bool :param ext: File extension :type ext: str :return: The full path of the saved file or None if the file couldn't be downloaded :rtype: str or NoneType :raises GetSequencesException: if a bad response is received from the db hit or the response times out. """ file_name = f"{sequence_id}.{ext}" # If file already exists, just load it, otherwise query the database full_path = os.path.abspath(file_name) if not os.path.isfile(full_path): if not remote_ok: return None url = url_format_str.format(ID=sequence_id, EXT=ext) try: resp = requests.get(url, timeout=30.0, stream=True, verify=False) resp.raise_for_status() except HTTPError: raise GetSequencesException('Bad response. Status code: %d' % resp.status_code) except requests.exceptions.Timeout: raise GetSequencesException('Request took too long to respond.') except requests.exceptions.RequestException as exc: raise GetSequencesException from exc else: if resp.text == '': raise GetSequencesException('Empty response returned') with open(full_path, "wb") as out_file: for chunk in resp.iter_content(chunk_size=1024): if chunk: # Filter out empty keep-alive chunks out_file.write(chunk) return full_path
################################################################################ # Readers ################################################################################
[docs]def read_sequences(filename): """ Read sequences from the filename. Format is detected from the file extension Note that this function is only used for non-structure filetypes. For structure filetypes, see the StructureConverter class. :param filename: Path to sequence file :type filename: str :rtype: list :return: A list of sequences in the file """ if (fileutils.get_sequence_file_format(filename) is fileutils.SeqFormat.fasta): return list(FastaAlignmentReader.read(filename)) elif filename.endswith(".aln"): return list(ClustalAlignmentReader.read(filename)) elif filename.endswith(".seqd"): return SeqDReader.read(filename) else: with MMSequenceConverter() as conv: return conv.readSequences(filename)
[docs]def from_biopython(biopy_seq): """ Convert a Biopython sequence to a ProteinSequence :param seq: A Biopython sequence to convert to a ProteinSequence :type seq: Bio.SeqRecord.SeqRecord :return: The converted sequence :rtype: schrodinger.protein.sequence.ProteinSequence """ return sequence.make_sequence(biopy_seq.seq, name=biopy_seq.name)
[docs]class StructureConverter(object): """ Reads a structure and converts it to a list of sequences. Note that this class produces sequences that are ordered based on residue number and insertion code, not connectivity. If that ever changes, `structure_model.MaestroStructureModel._extractChains` must also be updated. """
[docs] def __init__(self, ct, eid=None): """ :param ct: A structure to convert to sequences. :type ct: schrodinger.structure.Structure :param eid: The entry id to assign to the created sequences. If not given, the entry id from the structure will be used. :type eid: str """ self.ct = ct self.eid = eid
[docs] @classmethod def convert(cls, ct, eid=None): """ Convert the provided structure into a list of sequences. :param ct: A structure to convert to sequences. :type ct: schrodinger.structure.Structure :param eid: The entry id to assign to the created sequences. If not given, the entry id from the structure will be used. :type eid: str :return: A list of sequences, one per chain. :rtype: list[sequence.Sequence] """ converter = cls(ct, eid) return converter.makeSequences()
[docs] def makeSequences(self): """ Note that disulfide bonds might be between chains, so need to be calculated at the ct level :return: A list of sequences, one per chain. :rtype: list[sequence.Sequence] """ chains = self._extractChains() # Prepare disulfide bond maps ssbond_atoms = get_disulfide_bonds(self.ct) chain_res_ssatom_map = collections.defaultdict(dict) for pair in ssbond_atoms: for atom in pair: chain_res_ssatom_map[atom.chain][atom.getResidue()] = atom seqs, ssatom_res_map = self._convertChainsToSeqs( chains, ssatom_map=chain_res_ssatom_map) # Store inter- and intra-sequence disulfide bonds in residues for atom1, atom2 in ssbond_atoms: res1 = ssatom_res_map[atom1] res2 = ssatom_res_map[atom2] # Res will be None if the atom's res was skipped (e.g. non-protein) if (res1 is not None and res2 is not None and res1.disulfide_bond is None and res2.disulfide_bond is None): residue.add_disulfide_bond(res1, res2) return seqs
def _extractChains(self): """ Extract information from a structure and returns a list of chains as a lists of residues. This method mimics the behavior of `structure._ChainIterator` but caches information during the atom traversal to improve performance. See MSV-1317. :return: A list of lists (one for each chain) :rtype: list(list(structure._Residue)) """ ct = self.ct residue_atom_set = set( analyze.evaluate_asl(ct, "protein OR nucleic_acids")) # Atom indices that we assigned to a residue already seen_atom_indices = set() chain_res = collections.defaultdict( list) # map of chains to their residues for at in ct.atom: ai = at.index if ai in seen_atom_indices: continue if ai not in residue_atom_set: continue res = at.getResidue() resatom_indices = set(res.getAtomIndices()) seen_atom_indices.update(resatom_indices) chain_res[at.chain].append(res) # if the ordering of residues is changed here, # structure_model.MaestroStructureModel._extractChains and # _createSeqForNewChain must also be updated for chain in chain_res.values(): chain.sort(key=lambda r: (r.resnum, r.inscode)) return chain_res.values() def _isAlphaCarbon(self, atom): """ :param atom: An atom index to check. :type atom: int :return: Whether the atom is an alpha carbon :rtype: bool """ return mm.mmct_atom_get_pdbname(self.ct, atom) == ' CA ' def _convertChainsToSeqs(self, chains, ssatom_map): """ Takes a list of lists containing `structure._Residue` and converts them into `sequence.Sequence` :param chains: A list of lists, each representing a chain :type chains: list(list(structure._Residue)) :param ssatom_map: Mapping of chain name to structure residue to structure atom involved in disulfide bond. :type ssatom_map: dict(str, dict(structure._Residue, structure._Atom)) :return: A tuple with a list of sequences and a map between structure atoms in disulfide bonds and sequence elements. :rtype: tuple(list(sequence.Sequence), dict(structure._Atom, residue.Residue)) """ seqs = [] ssatom_res_map = {} props = self.ct.property pdb_id = props.get("s_pdb_PDB_ID") entry_name = props.get("s_m_entry_name") name, long_name = _get_name_and_description(props) protein_atoms = set(analyze.evaluate_asl(self.ct, '(protein)')) seqres_by_chain = seqres.get_seqres(self.ct) eid = self.eid or self.ct.atom[1].entry_id for sresidues in chains: chain_name = sresidues[0].chain sres_ssatom_map = ssatom_map.get(chain_name, {}) SeqClass = self._detectSequenceType(sresidues) make_res = SeqClass.makeSeqElement # TODO MSV-1504: Change this condition to # if issubclass(SeqClass,sequence.ProteinSequence) if SeqClass == sequence.ProteinSequence: kept_sresidues = [] for res in sresidues: if (res.atom[1].index in protein_atoms or res.pdbres.upper().strip() in residue.CAPPING_GROUP_ALPHABET): kept_sresidues.append(res) else: # Explicitly store None to indicate that residue was # filtered out ssatom = sres_ssatom_map.get(res) if ssatom is not None: ssatom_res_map[ssatom] = None sresidues = kept_sresidues # Add in any residues that appear in SEQRES records but aren't in # the structure if seqres_by_chain is not None and chain_name in seqres_by_chain: seqres_res = seqres_by_chain[chain_name] def get_short_code(ele_str, default=None): ele_str = ele_str.strip().upper() ele_type = SeqClass.ALPHABET.get(ele_str) if ele_type is None: return default # Compare by short code to allow for protonation differences return ele_type.short_code # Use different invalid symbols for unknown amino acids unk1, unk2 = "%", "&" seqres_str = "".join( get_short_code(ele, unk1) for ele in seqres_res) pdbres_str = "".join( get_short_code(res.pdbres, unk2) for res in sresidues) if pdbres_str in seqres_str: # Prior to 20-3, Prime populated the i_pdb_seqres_index # property with meaningless numbers for modeled loops. (See # PRIME-4376.) To avoid trying to parse these values, we # skip adding missing loops if the structured sequence is a # subset of the SEQRES. sresidues = self._addStructurelessTerminalOnly( sresidues, seqres_res, make_res) else: sresidues = self._addStructureless(sresidues, seqres_res, make_res) get_sres_key = partial(residue.get_structure_residue_key, entry_id=eid) residue_map = {} residues = [] non_unique = [] for sres in sresidues: new_res = self.convertStructResidue(sres, make_res) residues.append(new_res) if eid is not None and not isinstance(sres, residue.Residue): res_key = get_sres_key(sres) if res_key in residue_map: non_unique.append(res_key) residue_map[res_key] = new_res ssatom = sres_ssatom_map.get(sres) if ssatom is not None: ssatom_res_map[ssatom] = new_res if non_unique: rks = (f"{rk.resnum}{rk.inscode.strip()}" for rk in non_unique) msg = (f"Found non-unique residues {', '.join(rks)} " f"in chain {chain_name}") warnings.warn(SequenceWarning(msg)) seq = SeqClass(residues, name=name, chain=chain_name, structure_chain=chain_name, long_name=long_name, entry_id=eid, entry_name=entry_name, pdb_id=pdb_id, origin=SeqClass.ORIGIN.Maestro) if eid is not None: seq.setResidueMap(residue_map) seqs.append(seq) return sorted(seqs, key=lambda s: s.chain), ssatom_res_map def _detectSequenceType(self, struct_residues): """ Takes a list of structure residues and returns the sequence type for it. :param struct_residues: A list of residues :type struct_residues: list(structure._Residue) :return: A sequence class :rtype: type """ res_names = {res.pdbres.upper().strip() for res in struct_residues} return sequence.guess_seq_type(res_names)
[docs] @classmethod def convertStructResidue(cls, struct_res, make_res): """ Convert a `structure._Residue` into a `residue.Residue`. :param struct_res: A structure residue to convert. If this is a `residue.Residue` object, it will be returned unchanged. :type struct_res: structure._Residue or residue.Residue :param make_res: A method to convert a string into a `residue.Residue` :type make_res: callable :return: A newly created residue :rtype: residue.Residue """ if isinstance(struct_res, residue.Residue): return struct_res res = make_res(struct_res.pdbres) alpha_carbon = struct_res.getAlphaCarbon() if alpha_carbon is None: b_factor = struct_res.temperature_factor else: b_factor = alpha_carbon.temperature_factor if b_factor is None: b_factor is struct_res.temperature_factor if b_factor is None: res.b_factor = None else: res.b_factor = b_factor if b_factor > 0.0 else None for item in [ 'secondary_structure', 'resnum', 'inscode', 'molecule_number' ]: setattr(res, item, getattr(struct_res, item)) return res
def _addStructureless(self, residues, seqres, make_res): """ Combine the structured and structureless residues. :param residues: A list of structured residues. :type residues: list[schrodinger.structure._Residue] :param seqres: A list of structureless residue names. This must be a `OneIndexedList` since the i_pdb_seqres_index property values are one-indexed. :type seqres: OneIndexedList[str] :param make_res: A method to convert a string into a `residue.Residue` :type make_res: callable :return: A list of structured residues (as `schrodinger.structure._Residue` objects) and structureless residues (as `residue.Residue` objects). :rtype: list """ make_newres = partial(self._seqresToResidues, make_res=make_res) if not residues: # If no structure residues were passed in, we just return # a list of structureless residues. return make_newres(seqres) all_res = seqres.copy() no_seqres_res = collections.defaultdict(list) seqres_index_found = False prev_seqres_index = 0 # add structured residues to all_res if they have a seqres_index # property for cur_res in residues: cur_seqres_index = cur_res.atom[1].property.get( 'i_pdb_seqres_index') if cur_seqres_index is None: # This structured residue doesn't appear in the SEQRES list, so # it will get added to the sequence below no_seqres_res[prev_seqres_index].append(cur_res) elif cur_seqres_index > len(all_res): # The seqres_index value is too large, so we'll put this residue # at the end of the sequence no_seqres_res[len(all_res)].append(cur_res) elif not isinstance(all_res[cur_seqres_index], str): # We've already seen a structured residue with this seqres # index, so we stick this one after it no_seqres_res[cur_seqres_index].append(cur_res) else: # The value in all_res is a string, which means that this is the # first time we've seen this seqres index. Replace the string # residue in all_res with cur_res. seqres_index_found = True all_res[cur_seqres_index] = cur_res prev_seqres_index = cur_seqres_index if not seqres_index_found: # None of the structured residues had valid seqres indices (e.g. # all-DNA chain), so we don't attempt to add structureless residues return residues # add in structured residues that didn't have a valid seqres_index for seqres_index, res_to_insert in sorted(no_seqres_res.items(), reverse=True): # insert res_to_insert after seqres_index instead of before insert_index = seqres_index + 1 all_res[insert_index:insert_index] = res_to_insert # convert any remaining strings to residue objects res_before_seqres = None seqres_to_convert = [] fully_converted = [] for cur_res in all_res: if isinstance(cur_res, str): seqres_to_convert.append(cur_res) continue elif seqres_to_convert: converted = make_newres(seqres_to_convert, start_res=res_before_seqres, end_res=cur_res) fully_converted.extend(converted) seqres_to_convert = [] res_before_seqres = cur_res fully_converted.append(cur_res) if seqres_to_convert: converted = make_newres(seqres_to_convert, start_res=res_before_seqres) fully_converted.extend(converted) return fully_converted def _addStructurelessTerminalOnly(self, residues, seqres, make_res): """ Add any structureless residues that appear at the N- and C-termini, but ignore all other structureless residues. This method should be used for structures generated by Prime prior to 20-3 to work around PRIME-4376. (The i_pdb_seqres_index for modeled loops was populated with meaningless numbers.) See `_addStructureless` for argument and return value documentation. """ seqres_indices = [ res.atom[1].property.get('i_pdb_seqres_index') for res in residues ] seqres_indices = [idx for idx in seqres_indices if idx is not None] if not seqres_indices: return residues make_newres = partial(self._seqresToResidues, make_res=make_res) converted = residues.copy() min_seqres = min(seqres_indices) if min_seqres > 1: newres = make_newres(seqres[:min_seqres], end_res=residues[0]) converted[0:0] = newres max_seqres = max(seqres_indices) if max_seqres < len(seqres): newres = make_newres(seqres[max_seqres + 1:], start_res=residues[-1]) converted.extend(newres) return converted def _seqresToResidues(self, seqres, *, make_res, start_res=None, end_res=None): """ Convert the list of residue names to a list of `residue.Residue` objects using the given previous and next residue numbers. :param seqres: A list of residue names. :type seqres: list[str] :param make_res: A method to convert a string into a `residue.Residue` :type make_res: callable :param start_res: Previous residue. Pass None if the residues are N-terminal :type start_res: residue.Residue or NoneType :param end_res: Next residue. Pass None if the residues are C-terminal :type end_res: residue.Residue or NoneType :return: The newly created residues :rtype: list[residue.Residue] """ new_residues = [] for resname in seqres: res = make_res(resname) res.seqres_only = True new_residues.append(res) sequence.assign_residue_numbers(new_residues, start_res, end_res) return new_residues
def _get_name_and_description(props): """ Get the appropriate short name and description from the structure property dictionary :param props: Structure property dictionary :return: A short name and description :rtype: tuple(str, str) """ maestro_title = props.get("s_m_title") pdb_id = props.get("s_pdb_PDB_ID") entry_name = props.get("s_m_entry_name", '') pdb_description = props.get("s_pdb_PDB_TITLE", '') name = maestro_title or entry_name if not name: name = pdb_id or pdb_description else: try: pdbparts = parse_pdb_id(name.replace("_chain", "_")) except GetSequencesException: pass else: name = pdbparts.pdbcode long_name_parts = [pdb_description] if entry_name: long_name_parts.insert(0, entry_name) elif maestro_title: long_name_parts.insert(0, maestro_title) long_name = " | ".join(long_name_parts) return name, long_name
[docs]class MMSequenceConverter(object): """ Converts sequence between mmseq and MSV sequence formats. :note: This is supposed to be used with 'with' context manager. """ def __enter__(self): """ Initializes mmseq and mmseqio. """ mm.mmseq_initialize(mm.error_handler) mm.mmseqio_initialize(mm.error_handler) return self def __exit__(self, exc_type, exc_value, traceback): """ Terminates mmseq and mmseqio. """ mm.mmseq_terminate() mm.mmseqio_terminate() @classmethod def _makeSequence(cls, handle): """ Converts a mmseq sequence object specified by handle to `schrodinger.protein.sequence.Sequence` object. :type handle: mmseq handle :param handle: Handle of mmseq object. :rtype `schrodinger.protein.sequence.Sequence` :return: Sequence in MSV format. """ name = mm.mmseq_get_name(handle) match = re.match("(.*)_[A-Za-z0-9]+$", name) if match is not None: # strip out chain name from the sequence name name = match.group(1) codes = mm.mmseq_get_all_codes(handle) seq = sequence.make_sequence(codes, name=name) seq.chain = mm.mmseq_get_chainstr(handle) return seq @classmethod def _makeMMSEQ(cls, seq): """ Converts a `schrodinger.protein.sequence.ProteinSequence` to mmseq object. :note: The returned handle needs to be deleted by calling mm.mmseq_delete. :type seq: `schrodinger.protein.sequence.ProteinSequence` :param seq: Sequence to be converted. :rtype: int :return: mmseq sequence handle. """ seq_handle = mm.mmseq_new() for index, res in enumerate(seq): if res.is_res: mm.mmseq_insert_code(seq_handle, index, res.short_code, res.long_code) else: mm.mmseq_insert_code(seq_handle, index, '~', ' ') mm.mmseq_res_set_is_gap(seq_handle, index + 1, True) mm.mmseq_res_set_in_use(seq_handle, index + 1, True) mm.mmseq_set_name(seq_handle, seq.name) mm.mmseq_set_chain(seq_handle, seq.chain) return seq_handle
[docs] @classmethod def readSequences(cls, file_name, file_format=mm.MMSEQIO_ANY): """ Reads all sequences from file specified by file_name. :type file_name: str :param file_name: Name of input file. :type file_format: int :param file_format: Format of the input file. By default, the format is MMSEQIO_ANY meaning file type is automatically recognized. :rtype: List of `schrodinger.protein.sequence.Sequence`. :return: List of sequences read from the file. :raise GetSequencesException: If the file could not be read. """ if file_format == mm.MMSEQIO_ANY and file_name.endswith(".msf"): # SHARED-6945: mmseqio does not auto-detect MSF file_format = mm.MMSEQIO_MSF try: f_handle = mm.mmseqio_open_file(file_name, mm.MMSEQIO_READ, file_format) sequences = [] num_seqs = mm.mmseqio_get_num_sequences(f_handle) for seq_index in range(num_seqs): seq_handle = mm.mmseqio_read_sequence(f_handle, seq_index + 1) sequences.append(cls._makeSequence(seq_handle)) mm.mmseqio_close_file(f_handle) except mm.MmException as err: msg = "Error reading %s\n\n%s" % (file_name, str(err)) raise GetSequencesException(msg) return sequences
[docs] @classmethod def writeSequences(cls, sequences, file_name, file_format=mm.MMSEQIO_NATIVE): """ Writes sequences to a file specified by file_name. :raises mmcheck.MmException: If the file could not be open for writing. :type sequences: list of `schrodinger.protein.sequence.ProteinSequence` :param seqences: List of sequences to be written to file. :type file_name: str :param file_name: Name of input file. :type file_format: int :param file_format: Format of the input file. By default, the format is MMSEQIO_NATIVE. """ f_handle = mm.mmseqio_open_file(file_name, mm.MMSEQIO_WRITE, file_format) for seq in sequences: seq_handle = cls._makeMMSEQ(seq) mm.mmseqio_write_sequence(f_handle, seq_handle) mm.mmseq_delete(seq_handle) mm.mmseqio_close_file(f_handle)
[docs]class BaseProteinAlignmentReader(object): """ Base class for reading protein sequence alignments from files. """ @staticmethod def _makeSequencesFromElements(sequences): """ Helper function that creates ProteinAlignment from an ordered dictionary of sequence names and elements :param sequences: Ordered dictionary of sequence elements :type sequences: collections.OrderedDict :return: A list of sequences :rtype: list """ return [ sequence.make_sequence(elements, name=name) for name, elements in sequences.items() ]
[docs] @classmethod def read(cls, file_name, AlnCls=alignment.ProteinAlignment): """ Returns alignment read from file :note: The alignment can be empty if no sequence was present in the input file. :param file_name: Source file name :type file_name: str :param AlnCls: The type of the Alignment to return :type AlnCls: type :return: An alignment of the specified type :raises IOError: If file cannot be read """ raise NotImplementedError( "Subclasses should implement this classmethod")
[docs]class ClustalAlignmentReader(BaseProteinAlignmentReader): """ Class for reading Clustal `*.aln` files. """
[docs] @classmethod def read(cls, file_name, AlnCls=alignment.ProteinAlignment): """ :param file_name: Source file name :type file_name: str :param AlnCls: The type of the Alignment to return :type AlnCls: type :return: An alignment of the specified type """ sequences = collections.OrderedDict() with open(file_name, "r") as f: next(f) # Skip first line for line in f: if not line or line[0].isspace(): continue chunks = line.split() if len(chunks) < 2: continue seq_name = chunks[0] seq_elements = chunks[1] if seq_name in sequences: sequences[seq_name] += seq_elements else: sequences[seq_name] = seq_elements seqs = cls._makeSequencesFromElements(sequences) aln = AlnCls(seqs) return aln
[docs]class SeqDReader: class _RawProteinSequence: """ A helper class to store raw information about a sequence. After adding raw information, a ProteinSequence can be generated with the `toProteinSequence` method. """ def __init__(self, seq_name, chain_name): self.chain_name = chain_name self.resnums = [] self.rescodes = [] self.ssa_list = [] self.descriptors = collections.defaultdict(list) self.seq_name = seq_name def addResnum(self, resnum): self.resnums.append(int(resnum)) def addRescode(self, rescode): self.rescodes.append(rescode) def addSSA(self, ssa): self.ssa_list.append(ssa) def addDescriptor(self, descriptor_name, descriptor_value): self.descriptors[descriptor_name].append(descriptor_value) def toProteinSequence(self): descriptors = self.descriptors for k, v in descriptors.items(): def cast_values_to_float(values): return list(map(float, values)) try: descriptors[k] = cast_values_to_float(v) except ValueError: # If we can't cast the values to float, we just leave # them as strings. pass seq = sequence.ProteinSequence(self.rescodes, name=self.seq_name, resnums=self.resnums, chain=self.chain_name) for descriptor_name, descriptor_values in descriptors.items(): for res, desc_value in zip(seq, descriptor_values): res.updateDescriptors({descriptor_name: desc_value}) for idx, ssa in enumerate(self.ssa_list): ssa_val = SSA_MAP.get(ssa) seq[idx].secondary_structure = ssa_val return seq
[docs] @classmethod def read(cls, file_name): with open(file_name) as infile: header = infile.readline().strip() seq_name = '' if ',' not in header: seq_name = header header = infile.readline().strip() fieldnames = [field.strip() for field in header.split(',')] for col in SEQD_REQUIRED_COLUMNS: if col not in fieldnames: err_msg = f'SeqD file format requires a "{col}" column.' raise IOError(err_msg) get_ssa = 'SSA' in fieldnames csv_reader = csv.DictReader(infile, fieldnames=fieldnames) raw_seqs = {} # chain name to list of residue names for line in csv_reader: if line['Chain'] not in raw_seqs: raw_seqs[line['Chain']] = cls._RawProteinSequence( seq_name, line['Chain']) raw_seq = raw_seqs[line['Chain']] raw_seq.addRescode(line['ResName']) raw_seq.addResnum(line['ResID']) if get_ssa: raw_seq.addSSA(line['SSA']) for column_name, row_value in line.items(): if column_name in ('ResID', 'Chain', 'ResName', 'SSA'): continue raw_seq.addDescriptor(column_name, row_value) seqs = [] for raw_seq in raw_seqs.values(): seqs.append(raw_seq.toProteinSequence()) return seqs
[docs]class FastaAlignmentReader: @staticmethod def _parseCodes(codes): """ Parses string of single-letter amino acid codes, optionally interleaved with residue numbers. Converts the codes to uppercase and filters out all characters except for A-Z and gap codes. :type codes: str :param codes: String to parse. :rtype: Tuple of (str, list of ints) :return: Parsed sequence elements string and list of residue numbers. """ codes = codes.upper() if not HAS_DIGIT_RE.search(codes): # If there are no digits in the input, don't waste time trying to # parse out non-existant residue numbers return (codes, ()) resnum = 1 nums = "" elements = "" gaps = ['.', '-', '~'] numbers = [] for ch in codes: if '0' <= ch <= '9': nums += ch elif 'A' <= ch <= 'Z' or ch in gaps: if nums: resnum = int(nums) nums = "" elements += ch if ch not in gaps: numbers.append(resnum) resnum += 1 return (elements, numbers)
[docs] @classmethod def parseSSA(cls, seq): """ Parse a SSA sequence into a list of SSA values that can be assigned to residues' `secondary_structure` property :param seq: the "sequence" from the FASTA file which encodes the SSA values :type seq: str :return: a list of the SSA values. The SSA values come from schrodinger.structure. Returns None if any of the elements was invalid :type: list(int) or NoneType """ seq = seq.replace('\n', '') anno_vals = [] for ele in seq: val = SSA_MAP.get(ele, INVALID_SSA_VAL) if val is INVALID_SSA_VAL: return None anno_vals.append(val) return anno_vals
[docs] @classmethod def read(cls, file_name, AlnClass=alignment.ProteinAlignment): """ Loads a sequence file in FASTA format, creates sequences and appends them to alignment. Splits sequence name from the FASTA header. :param file_name: name of input FASTA file :type file_name: str :param AlnClass: The class of the alignment object to return :type AlnClass: type :return: Read alignment. :rtype: AlnClass """ with open(file_name) as f: lines = [line for line in f if line] return cls.readFromText(lines, AlnClass)
[docs] @classmethod def readFromText(cls, lines, AlnClass=alignment.ProteinAlignment): """ Read sequences from FASTA-formatted text, creates sequences and appends them to alignment. Splits sequence name from the FASTA header. :param lines: list of strings representing FASTA file :type lines: list of str :param AlnClass: The class of the alignment object to return :type AlnClass: type :return: The alignment :rtype: AlnClass """ seq_list = [] fasta_parts = None for group in _partition_by_predicate( lines, lambda x: x[0] in FASTA_COMMENT_CHARS): cur_header = group[0] seq_rows = group[1:] # Store first comment row to use as long_name if fasta_parts is None or cur_header[0] == ">": header = cur_header fasta_parts = parse_fasta_header(header) if len(seq_rows) == 0: continue seq = "".join(seq_rows) # Apply annotation values to the previous sequence which was parsed # but only if the ssa's and sequence's name and chain name match if fasta_parts.anno_type and len(seq_list): prev_seq = seq_list[-1] if (prev_seq.chain == fasta_parts.chain and prev_seq.name == fasta_parts.name and prev_seq.long_name == fasta_parts.long_name and fasta_parts.anno_type is PSAnno.ANNOTATION_TYPES.secondary_structure): anno_values = cls.parseSSA(seq) if anno_values: for idx, val in enumerate(anno_values): prev_seq[idx].secondary_structure = val continue elements, numbers = cls._parseCodes(seq) seq = sequence.make_sequence(elements, name=fasta_parts.name, long_name=fasta_parts.long_name, chain=fasta_parts.chain, resnums=numbers) seq_list.append(seq) # Reset fasta_parts after sequences have been processed fasta_parts = None return AlnClass(sequences=seq_list)
[docs] @classmethod def readFromStringList(cls, strings, AlnClass=alignment.ProteinAlignment): """ Return an alignment object created from an iterable of sequence strings :param strings: Sequences as iterable of strings (1D codes) :type strings: Iterable of strings :param AlnClass: The class of the alignment object to return :type AlnClass: type :return: The alignment :rtype: AlnClass """ fastalines = ((">{0}".format(i), ln) for i, ln in enumerate(strings)) return cls.readFromText(itertools.chain(*fastalines), AlnClass)
################################################################################ # Writers ################################################################################
[docs]def to_biopython(seq): """ Converts a sequence to a Biopython sequence :param seq: A sequence to convert to a Biopython sequence :type seq: schrodinger.protein.sequence.ProteinSequence :return: The sequence converted to a Biopython SeqRecord :rtype: Bio.SeqRecord.SeqRecord """ seq_str = str(seq).replace(seq.gap_char, '') return BioSeqRecord(BioSeq(seq_str), name=seq.name)
[docs]class BaseProteinAlignmentWriter(object): """ Class for writing protein alignments to files. """ @staticmethod def _makeSeqName(seq, aln, index, make_unique, use_long_name=True): """ Returns a name for the sequence. If make_unique is True, guarantees that no other seq in the aln will have this name """ def get_name(the_seq): if use_long_name and the_seq.long_name: return the_seq.long_name return the_seq.name name = get_name(seq) if not make_unique: return name uniques = {(get_name(seq_), seq_.chain) for seq_ in aln} use_chains = len(uniques) == len(aln) uniquifier = str(index + 1) if use_chains and seq.chain: uniquifier = seq.chain return f"{name}_{uniquifier}"
[docs] @classmethod def write(cls, aln, file_name, **kwargs): """ Writes aln to a file. :type aln: `BaseAlignment` :param aln: Alignment to be written to a file. :type file_name: str :param file_name: Destination file name. :Note: Subclasses may take additional `**kwargs` as write options """ raise NotImplementedError( "Subclasses should implement this classmethod")
[docs]class FastaAlignmentWriter(BaseProteinAlignmentWriter): """ Class for writing FASTA .fasta files. Format is described here: U{Fasta format wikipedia<https://en.wikipedia.org/wiki/FASTA_format>} """ HEADER_START = '>' HEADER_END = ''
[docs] @classmethod def toString(cls, aln, use_unique_names=True, maxl=50): return cls.toStringAndNames(aln, use_unique_names=use_unique_names, maxl=maxl)[0]
[docs] @classmethod def toStringAndNames(cls, aln, use_unique_names=True, maxl=50, export_annotations=False, sim_ref_seq=None): """ Converts aln to FASTA string :param aln: Structured sequences :type aln: `ProteinAlignment` :param use_unique_names: If True, write unique name for each sequence. :type use_unique_names: bool :param maxl: Maximum length of a line :type maxl: int :param export_annotations: Whether annotations should be exported along with sequence information. If True, annotations listed in `EXPORT_ANNOTATIONS` will be exported. :type export_annotations: bool :param sim_ref_seq: Reference sequence to calculate similarities for the sequences to be exported. If None, similarity will not be exported. :type sim_ref_seq: `sequence.Sequence` or None :return: FASTA string :rtype: string """ outlines = [] names = [] for index, seq in enumerate(aln): # Format title name = cls._makeSeqName(seq, aln, index, use_unique_names, use_long_name=True) names.append(name) h_start = cls.HEADER_START h_end = cls.HEADER_END chain = seq.chain if seq.chain is not None else "" header_name = NAME_STR.format(name=name, chain=chain) base_header = h_start + header_name if sim_ref_seq: ident = seq.getIdentity(sim_ref_seq) * 100 sim = seq.getSimilarity(sim_ref_seq) * 100 hom = seq.getConservation(sim_ref_seq) * 100 base_header += SIM_STR.format(id=ident, sim=sim, hom=hom) header = base_header + h_end # Format sequence (wrapped at maxl) s = str(seq) s = s.replace(seq.gap_char, "-") # FASTA uses "-" to denote gaps cls._appendSeqOrAnnotationLine(header, s, outlines, maxl) if export_annotations: for header_ext, anno in EXPORT_ANNOTATIONS.items(): anno_header = base_header + header_ext + h_end anno_vals = getattr(seq.annotations, anno.name) if anno == PSAnno.ANNOTATION_TYPES.secondary_structure: anno_vals = [SSA_MAP_RV.get(v, '-') for v in anno_vals] anno_str = ''.join(anno_vals) cls._appendSeqOrAnnotationLine(anno_header, anno_str, outlines, maxl) outstr = '\n'.join(outlines) return outstr, names
@staticmethod def _appendSeqOrAnnotationLine(header, row, outlines, maxl): """ Append a header and sequence or annotation info to the list of rows. :param header: Header for this row :type header: str :param row: Sequence or annotation row :type row: str :param outlines: List of output lines to append row to :type outlines: list :param maxl: Max line length :type maxl: int """ outlines.append(header) s = row outlines.extend( [s[i * maxl:(i + 1) * maxl] for i in range(len(s) // maxl + 1)])
[docs] @classmethod def toStringList(cls, aln): """ Convert ProteinAlignment object to list of sequence strings :param aln: Alignment data :type aln: `ProteinAlignment` :rtype: list of str :return: A list of sequence strings representing the alignment """ fastastr = cls.toString(aln, maxl=10000).split('\n') return [s for s in fastastr if '>' not in s]
[docs] @classmethod def write(cls, aln, file_name, use_unique_names=True, maxl=50, export_annotations=False, sim_ref_seq=None, **kwargs): """ Write aln to FASTA file :raises IOError: If output file cannot be written. :param aln: Structured sequences :type aln: `ProteinAlignment` :param use_unique_names: If True, write unique name for each sequence. :type use_unique_names: bool :param maxl: Maximum length of a line :type maxl: int :type file_name: str :param file_name: Destination file name. :param export_annotations: Whether annotations should be exported along with sequence information. If True, annotations listed in `EXPORT_ANNOTATIONS` will be exported. :type export_annotations: bool :param sim_ref_seq: Reference sequence to calculate similarities for the sequences to be exported. If None, similarity will not be exported. :type sim_ref_seq: `sequence.Sequence` or None :return: output names of each sequence :rtype: list of str """ with open(file_name, 'w') as f: fasta_str, names = cls.toStringAndNames( aln, use_unique_names=use_unique_names, maxl=maxl, export_annotations=export_annotations, sim_ref_seq=sim_ref_seq) f.write(fasta_str) return names
[docs]class ClustalAlignmentWriter(BaseProteinAlignmentWriter): """ Class for writing Clustal `*.aln` files. The format is described here: `http://meme-suite.org/doc/clustalw-format.html` """
[docs] @classmethod def write(cls, aln, file_name, use_unique_names=True, **kwargs): """ Writes aln to a Clustal alignment file. Note: `**kwargs` are ignored, to preserve signature of BaseProteinAlignmentWriter :raises IOError: If output file cannot be written. :type aln: `BaseAlignment` :param aln: Alignment to be written to a file. :type file_name: str :param file_name: Destination file name. :param use_unique_names: If True, write unique name for each sequence. :type use_unique_names: bool :rtype: dict :return: A mapping of names written to the clustal file and sequences """ # prepare list of sequences, replace '~' with '.' name_seq_mapping = {} sequences = collections.OrderedDict() for index, seq in enumerate(aln): name = cls._makeSeqName(seq, aln, index, use_unique_names, use_long_name=False) name = name.replace(' ', '_') name_seq_mapping[name] = seq # clustal uses "-" to denote gaps sequences[name] = list(str(seq).replace(seq.gap_char, '-')) # remove terminal gaps while len(sequences[name]) and sequences[name][-1] == '-': sequences[name].pop() # Pad sequences with gaps so they're rectangular max_len = max(len(seq) for seq in sequences.values()) for name, elements in sequences.items(): elements += ['-'] * (max_len - len(elements)) sequences[name] = ''.join(elements) name_length = max(map(len, list(sequences))) + 1 lines = [] # header lines.append("CLUSTAL W (2.0) multiple sequence alignment\n\n") # write 60 characters per line line_length = 60 pos = 0 sequences_left = True while sequences_left: sequences_left = False for name, elements in sequences.items(): line = name.ljust(name_length) if pos < len(elements): line += elements[pos:pos + line_length] if pos + line_length < len(elements): sequences_left = True lines.append(line + '\n') lines.append('\n') pos += line_length with open(file_name, "w") as f: f.writelines(lines) return name_seq_mapping
[docs]class CSVAlignmentWriter(BaseProteinAlignmentWriter):
[docs] @classmethod def write(cls, aln, file_name, export_descriptors=False, **kwargs): descriptor_names = [] descriptor_values = [] if export_descriptors: descriptors = aln.getSeqsDescriptors() descriptor_names = [desc.display_name for desc in descriptors] with csv_unicode.writer_open(file_name) as fh: csv_writer = csv.writer(fh) headers = ['ID', 'Name', 'Chain', 'Sequence'] + descriptor_names csv_writer.writerow(headers) for idx, seq in enumerate(aln, start=1): seq_name = seq.name chain = seq.chain seq_str = str(seq) if export_descriptors: descriptor_values = [ seq.getProperty(desc) if seq.getProperty(desc) else '' for desc in descriptors ] row = [idx, seq_name, chain, seq_str] + descriptor_values csv_writer.writerow(row)
[docs]class SeqDAlignmentWriter(BaseProteinAlignmentWriter): """ Class to write sequence and descriptors to seqd file. Each sequence is exported to a seqd file with name "<seq_name>_<chain_name>.seqd" """
[docs] @classmethod def write(cls, aln, file_name, export_descriptors=True, export_annotations=False, **kwargs): for seq in aln: seq_name = seq.name headers = list(SEQD_REQUIRED_COLUMNS) anno_to_add = False if export_annotations: for name, anno in EXPORT_ANNOTATIONS.items(): anno_header = name.strip("|") anno_to_add = anno_header not in headers if anno_to_add: headers.append(anno_header) anno_vals = getattr(seq.annotations, anno.name) if anno == PSAnno.ANNOTATION_TYPES.secondary_structure: anno_vals = [ SSA_MAP_RV.get(v, '-') for v in anno_vals ] if export_descriptors: headers += seq[0].getDescriptorKeys() with csv_unicode.writer_open(file_name) as fh: csv_writer = csv.writer(fh) for row in ([seq_name], headers): csv_writer.writerow(row) for i, res in enumerate(seq): row = [res.resnum, res.chain, res.short_code] if anno_to_add: row.append(anno_vals[i]) if export_descriptors: descriptors = [ res.getDescriptorValue(desc) for desc in res.getDescriptorKeys() ] row += descriptors assert len(row) == len(headers) csv_writer.writerow(row)
[docs]def is_inhouse_header(fasta_header): """ Test that the given fasta header is of the in house format In house format is given by `">NAME:<long_name>|CHAIN:<chain>"` with an optional `"|<anno_type>"` flag on the end. Example:: >NAME:ABC|CHAIN:X|SSA >NAME:A|B|C|CHAIN:X x :param fasta_header: The fasta header to check :type fasta_header: str :return: Whether it is or isnt the in-house format :rtype: bool """ header_regex = NAME_FLAG + r".*\|" + CHAIN_FLAG + r".*" return bool(re.findall(header_regex, fasta_header))
[docs]def parse_in_house_header(fasta_header): """ Test that the given fasta header is of the in house format In house format is given by `">NAME:<long_name>|CHAIN:<chain>"` with an optional `"|<anno_type>"` flag on the end.: Example:: >NAME:ABC LONG|CHAIN:X|SSA --> ABC LONG, X, secondary_structure >NAME:A|B|C|CHAIN:X x --> A|B|C, X, None :param fasta_header: The fasta header to parse :type fasta_header: str :return: the long_name, chain and annotation type corresponding to the header :rtype: tuple(str, str, PSAnno.ANNOTATION_TYPES) or NoneType) """ assert is_inhouse_header(fasta_header), ( "Header does not follow in-house format") for key, value in EXPORT_ANNOTATIONS.items(): if key in fasta_header: anno_type = value fasta_header = fasta_header.replace(key, "") break else: anno_type = None name_part, chain = fasta_header.rsplit("|CHAIN:", 1) long_name = name_part.replace(NAME_FLAG, "", 1) return long_name, chain, anno_type
[docs]def parse_fasta_header(header, permissive=True): """ Parse a FASTA header into a (name, long_name, chain, anno_type) Named tuple. :param header: The header for a single entry in a FASTA file (including leading comment character) :type header: str :param permissive: Whether to use permissive parsing. See `parse_pdb_id` for documentation. :type permissive: bool :return: Named tuple of (name, long_name, chain, anno_type) :type: FastaParts """ assert header[0] in FASTA_COMMENT_CHARS, "Cannot parse non-FASTA header" # Remove leading comment char, leading whitespace, and trailing newline # Note: not using strip() because a trailing space can be a chain ID header = header.lstrip("".join(FASTA_COMMENT_CHARS)).lstrip().rstrip("\r\n") if is_inhouse_header(header): # First try custom format used by FastaAlignmentWriter long_name, chain, anno_type = parse_in_house_header(header) short_name, _ = parse_long_name(long_name, permissive) else: long_name = header anno_type = None short_name, chain = parse_long_name(long_name, permissive) return FastaParts(short_name, long_name, chain, anno_type)
[docs]def parse_long_name(long_name, permissive=True): """ Attempt to parse a long_name into a short name and a chain. Example: 1FSK:A --> 1FSK, A 2BJM.H VH CDR_LENGTH: 5 17 11 --> 2BJM, H sp|accession|entry name --> accession, "" :param long_name: The long name to attempt to parse :type long_name: str :param permissive: Whether to use permissive parsing. See `parse_pdb_id` for documentation. :type permissive: bool :return: A short name and a chain id :rtype: PdbParts """ pdbparts = PdbParts(pdbcode="", pdbchain="") # Next try NCBI formats pipe_parts = long_name.split("|", 1) if len(pipe_parts) == 2 and pipe_parts[0] in NCBI_FLAGS: # Get first whitespace-separated word ncbi_start = long_name.strip().split(None, 1)[0] ncbi_parts = ncbi_start.split("|") chain = "" if ncbi_parts[0] == "pdb" and len(ncbi_parts) >= 3: name_parts = ncbi_parts[1:2] chain = ncbi_parts[2] else: name_parts = (part for part in ncbi_parts[1:] if part) return PdbParts(pdbcode="|".join(name_parts), pdbchain=chain) # Split by separator and/or whitespace # Only split by "|" if there is no whitespace if len(long_name.strip().split(None, 1)) == 1: long_name = long_name.replace("|", " ") separators = [","] for sep in separators: long_name = long_name.replace(sep, " ") parts = long_name.split(None, 1) if not parts: return pdbparts # Use first "word" as name split_name = parts[0] try: pdbparts = parse_pdb_id(split_name, permissive=permissive) except GetSequencesException: # If parsing fails, don't extract a chain ID return PdbParts(pdbcode=split_name, pdbchain=pdbparts.pdbchain) else: return pdbparts
[docs]def reorder_fasta_alignment(aln, orig_names): """ Reorder a FASTA alignment to match the order of names written to FASTA. Intended for use after alignment methods that reorder the output. Example usage:: orig_names = seqio.FastaAlignmentWriter.write(orig_aln, input_filename) # run alignment method aln = seqio.FastaAlignmentReader.read(out_filename) reorder_fasta_alignment(aln, orig_names) :param aln: Alignment to reorder. Will be modified in place. :type aln: alignment.BaseAlignment :param orig_names: Original order of sequence names as written to FASTA. :type orig_names: list[str] :raises ValueError: If the alignments have different lengths or mismatched names """ if len(orig_names) != len(aln): raise ValueError("The length of the alignment and names do not match " f"({len(aln)}, {len(orig_names)})") new_long_names = [seq.long_name for seq in aln] reorder_indices = [] for name in orig_names: if name in new_long_names: idx = new_long_names.index(name) else: raise ValueError( f"Could not find a sequence corresponding to {name}") reorder_indices.append(idx) aln.reorderSequences(reorder_indices)