Source code for schrodinger.livedesign.rgfile_parse

"""
Prototype for a RGfile parser. This will be eventually implemented in RDKit.

The RG parser is more general than the SD one, and any .sdf file should parse ok with it.

- Only V3000 format is supported.
- We only support default RLOGIC settings. Allowing RLOGIC would require our
  MolAddRecursiveQueries() strategy to construct the queries, and require a
  complex query enumeration strategy.
"""
import logging
import re
from typing import Iterator
from typing import List
from typing import Optional
from typing import TextIO
from typing import Union

from rdkit import Chem

RGROUP_BEGIN_MARK = 'M  V30 BEGIN RGROUP '
RGROUP_END_MARK = 'M  V30 END RGROUP'
RGROUP_RLOGIC_PREFIX = 'M  V30 RLOGIC '
CTAB_END_MARK = 'M  V30 END CTAB'
MOL_END_MARK = 'M  END'

NON_ZERO_NUMBERS = re.compile('123456789')

logger = logging.getLogger(__name__)


def _check_unsupported_rlogic(rlogic_line: str):
    """
    Raise a RuntimeError if RLOGIC specifies any non default 'thenR' or
    'RestH', or if 'Ocurr' is non-blank and contains any non-zero atom index.

    :param rlogic_line: the RLOGIC line in a RGROUP block.

    :raises RuntimeError: if non-default values are present
    """
    rlogic_line = rlogic_line.strip()
    rlogic_data = rlogic_line[len(RGROUP_RLOGIC_PREFIX):].split()

    if (rlogic_data[0] != '0' or rlogic_data[1] != '0' or
        (len(rlogic_data) > 2 and NON_ZERO_NUMBERS.search(rlogic_data[2]))):
        raise RuntimeError('Non-default RLOGIC in RGfiles is not supported.')


def _parse_rgroup_block(data_src: Union[Iterator[str],
                                        TextIO], header: str, sanitize: bool,
                        removeHs: bool, strictParsing: bool) -> List[Chem.Mol]:
    """
    Parse an RGROUP block and check that RLOGIC line, if present, only contains
    default values.

    :param data_src: an iterator over the RG molblock being parsed
    :param header: the header in the RGfile
    :param sanitize: whether to sanitize the mols when parsing them
    :param removeHs: whether to remove Hs from the parsed mols
    :param strictParsing: whether to enable strict parsing for the mols

    :returns: a list of the mol queries in this RGroup
    """

    line = next(data_src)
    if line.startswith(RGROUP_RLOGIC_PREFIX):
        _check_unsupported_rlogic(line)
        line = next(data_src)

    rgroup_mols = []
    while not line.startswith(RGROUP_END_MARK):
        molblock = header
        while not line.startswith(CTAB_END_MARK):
            molblock += line
            line = next(data_src)
        molblock += line + MOL_END_MARK
        mol = Chem.MolFromMolBlock(molblock, sanitize, removeHs, strictParsing)
        if mol is None:
            raise RuntimeError(
                f'RGroup mol at position {len(rgroup_mols) + 1} failed to parse.'
            )
        rgroup_mols.append(mol)

        line = next(data_src)

    return rgroup_mols


def _add_rgroup_queries(mol: Chem.Mol, rgroup_id: str,
                        rgroup_mols: List[Chem.Mol]) -> Chem.Mol:
    """
    Add the mols in an RGroup block to the core mol as subqueries of each
    of the atoms labeled with the same ID as the RGroup.

    :param mol: the core mol on which to add the subqueries
    :param rgroup_id: the label of the current RGroup being added.
    :param rgroup_mols: the mol to be added as subqueries

    :returns: the updated mol
    """
    subqueries = {f'subquery_{i}': query for i, query in enumerate(rgroup_mols)}
    query_prop_value = ','.join(subqueries.keys())

    for at in mol.GetAtoms():
        if at.GetSymbol() == rgroup_id:
            at.SetProp('query', query_prop_value)

    Chem.MolAddRecursiveQueries(mol, subqueries, 'query')

    return mol


def _skip_to_next_mol(data_src: Union[Iterator[str], TextIO]):
    """
    Just read and discard lines from the input until we reach the end
    of the mol or the EOF.
    """
    line = ''
    try:
        while not line.startswith(MOL_END_MARK):
            line = next(data_src)
    except StopIteration:
        pass


def _fail_parsing(msg, strictParsing):
    logger.error(msg)
    if strictParsing:
        raise RuntimeError(msg)
    return None


[docs]def parse_rgmol(data_src: Union[Iterator[str], TextIO], sanitize: bool = True, removeHs: bool = True, strictParsing: bool = True): """ Parse a V3000 RG mol from an iterable stream :param data_src: the iterator over the RG molblock being parsed :param sanitize: whether to sanitize the different mol when parsing them :param removeHs: whether to remove Hs from the parsed mols :param strictParsing: whether to enable strict parsing for the mols :returns: a mol with subqueries for each of the defined RGROUPS """ # Grab the 4-line header and store it away to use it for the RGROUP blocks. # This is the simplest way to pass on the same 2D/3D and V2000/V3000 flags. # We might find some EOF blank lines here, so we allow StopIteration to be # raised from here. header = '' for _ in range(4): header += next(data_src) # The header for V2000 files is different from the V3000 one. if header.startswith('$MDL'): _skip_to_next_mol(data_src) return _fail_parsing("V2000 RGfiles are not supported", strictParsing) elif 'V3000' not in header: _skip_to_next_mol(data_src) return _fail_parsing("Could not find a V3000 RGfile header.", strictParsing) # Parse the core mol molblock = header try: for line in data_src: if line.startswith(RGROUP_BEGIN_MARK) or line.startswith( MOL_END_MARK): break molblock += line except StopIteration: # if we run out of data here, see if we can still # parse a mol from what we have seen pass else: molblock += MOL_END_MARK mol = Chem.MolFromMolBlock(molblock, sanitize, removeHs, strictParsing) if mol is None: return _fail_parsing("Failed parsing the 'core' mol.", strictParsing) # Parse and add the R Groups try: while not line.startswith(MOL_END_MARK): rgroup_id = 'R' + line[len(RGROUP_BEGIN_MARK):].strip() rgroup_mols = _parse_rgroup_block(data_src, header, sanitize, removeHs, strictParsing) mol = _add_rgroup_queries(mol, rgroup_id, rgroup_mols) line = next(data_src) except Exception as exc: _skip_to_next_mol(data_src) return _fail_parsing(f'Failed parsing RG Groups: {str(exc)}', strictParsing) return mol
[docs]class ForwardRGMolSupplier: """ An iterator class for lazy parsing of RGfiles in imitation of RDKit's ForwardSDMolSupplier. Only V3000 files and default RLOGIC values are supported. Allowed data sources are either file objects or strings. """
[docs] def __init__(self, data_src: Union[TextIO, str, None] = None, sanitize: bool = True, removeHs: bool = True, strictParsing: bool = True): self.setData(data_src, sanitize, removeHs, strictParsing)
[docs] def setData(self, data_src: Union[TextIO, str, None], sanitize: Optional[bool] = None, removeHs: Optional[bool] = None, strictParsing: Optional[bool] = None): """ Update the data source for the iterator and/or the mol parsing options""" if isinstance(data_src, str): self.data_src = iter(data_src.splitlines(keepends=True)) else: self.data_src = data_src if sanitize is not None: self.sanitize = sanitize if removeHs is not None: self.removeHs = removeHs if strictParsing is not None: self.strictParsing = strictParsing
def __iter__(self): return self def __next__(self) -> Chem.Mol: if self.data_src is None: raise RuntimeError( 'No data source was specified for the ForwardRGMolSupplier') try: mol = parse_rgmol(self.data_src, self.sanitize, self.removeHs, self.strictParsing) finally: # if a mol was parsed (or failed), we should now read a '$$$$' separator. # In case it is missing, we might have a single mol file/molblock, # so we don't want to raise anything. If something is wrong, and/or this # is not a separator, we'll hit trouble and raise in the next iteration try: next(self.data_src) except StopIteration: pass return mol