Source code for schrodinger.structure._io

"""
Structure reading/writing.

`StructureReader` provides a convenient iterator to read structures from
files, and `StructureWriter` provides an efficient means of writing many
files.

`MultiFileStructureReader` iterates through all the structure in multiple files
using `StructureReader`.

Copyright Schrodinger LLC, All Rights Reserved.
"""

import collections
import contextlib
import csv  # For SmilesCsvReader
import enum
import gzip
import os
import re
import tempfile
import warnings
from contextlib import contextmanager
from functools import partial

from schrodinger import adapter
from schrodinger.infra import canvas
from schrodinger.infra import mm
from schrodinger.infra import mmcheck
from schrodinger.infra import structure as infrastructure
from schrodinger.structure._structure import Structure
from schrodinger.structure._structure import _StructureProperty
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
from schrodinger.utils import mmutil
from schrodinger.utils import subprocess
from schrodinger.utils.fileutils import CIF
from schrodinger.utils.fileutils import MAESTRO
from schrodinger.utils.fileutils import MOL2
from schrodinger.utils.fileutils import PDB
from schrodinger.utils.fileutils import PHASE_HYPO
from schrodinger.utils.fileutils import SD
from schrodinger.utils.fileutils import SMILES
from schrodinger.utils.fileutils import SMILESCSV
from schrodinger.utils.fileutils import XYZ

# Placeholders for lazy (circular) module imports.
smiles = None

NO_STEREO = "none"
STEREO_FROM_GEOMETRY = "geometry"
STEREO_FROM_ANNOTATION = "annotation"
STEREO_FROM_ANNOTATION_AND_GEOM = "annotation_and_geom"
STEREO_FROM_3D = "3d"

# STEREO_FROM_3D is a deprecated version of STEREO_FROM_GEOMETRY, so is not
# included
stereo_options = set([
    NO_STEREO, STEREO_FROM_GEOMETRY, STEREO_FROM_ANNOTATION,
    STEREO_FROM_ANNOTATION_AND_GEOM
])


@contextmanager
def _suppress_error_reporting(error_handler):
    save_level = mm.mmerr_get_level(error_handler)
    mm.mmerr_level(error_handler, mm.MMERR_OFF)
    try:
        yield
    finally:
        mm.mmerr_level(error_handler, save_level)


class _ReaderWriterContextManager(object):
    """
    A mixin to enable context manager usage in reader and writer classes.
    """

    def __enter__(self):
        return self

    def __exit__(self, type, value, tb):
        if hasattr(self, 'close'):
            self.close()


[docs]class MaestroTextReader(_ReaderWriterContextManager): """ A class for reading structures from a Maestro format file. The structures returned are TextualStructure objects. These allow read-only access to the Structure-level properties but not to atoms or any properties which rely on atoms. """ read_mode = mm.M2IO_READ_FORWARD
[docs] def __init__(self, filename, index=1, error_handler=None): """ Initialize the reader. :type filename: string :param filename: The filename to read. :type index: int :param index: The index of the first structure to read. :type error_handler: int :param error_handler: The handle of the mmerr object to use for error logging. Defaults to schrodinger.infra.mm.error_handler. """ self._index = index if error_handler is None: error_handler = mm.error_handler mm.m2io_initialize(error_handler) self.error_handler = error_handler self.fh = None self.filename = filename
def __del__(self, _m2io_terminate=mm.m2io_terminate): self.close() _m2io_terminate() # required for iterator support def __iter__(self): return self def _m2io_open_file(self): try: self.fh = mm.m2io_open_file(self.filename, self.read_mode) except mm.MmException as e: # If m2io_open_file returned M2IO_ERR, check to see if this is # due to an empty file. if e.rc == mm.M2IO_ERR and os.path.getsize(self.filename) == 0: raise StopIteration() else: raise if self._index > 1: mm.m2io_goto_block(self.fh, mm.M2IO_BLOCK_WILDCARD_CT, self._index - 1) mm.m2io_leave_block(self.fh) def __next__(self): """Return the next Structure object from the file. """ parse_list = [] txt = "" if self.fh is None: # First iteration; open the file: self._m2io_open_file() try: txt = mm.m2io_goto_next_block_as_text(self.fh, mm.M2IO_BLOCK_WILDCARD_CT, parse_list, True) ct = infrastructure.create_structure(0) # Title is a special case - we should store that with the CT: datanames = ['s_m_title'] try: ret = mm.m2io_get_string(self.fh, datanames) mm.mmct_ct_set_title(ct.getHandle(), ret[0]) except KeyError: pass ur = mm.m2io_new_unrequested_handle(self.fh) mm.mmct_ct_m2io_set_unrequested_handle(ct.getHandle(), ur) st = TextualStructure(ct, txt) mm.m2io_leave_block(self.fh) except mm.MmException as e: if e.rc == mm.M2IO_EOF: raise StopIteration() else: raise Exception("Could not read the next structure from file") return st
[docs] def close(self): """ Close the file. """ if getattr(self, "fh", None) is not None: mm.m2io_close_file(self.fh) self.fh = None
[docs]class MaestroReader(_ReaderWriterContextManager): """ A class for reading structures from a Maestro (M2io) format file. """ # Make this setting a class variable so people can set it to M2IO_READ # if needed. read_mode = mm.M2IO_READ_FORWARD
[docs] def __init__(self, filename, index=1, error_handler=None, input_string=None): """ Initialize the reader. :type filename: string :param filename: The filename to read. :type index: int :param index: The index of the first structure to read. :type error_handler: int :param error_handler: The handle of the mmerr object to use for error logging. Defaults to schrodinger.infra.mm.error_handler. :type input_string: string :param input_string: A string with the contents of a Maestro format file. If provided, the filename argument is ignored. """ self.error_handler = self.getErrorHandler() self._index = index mm.m2io_initialize(self.error_handler) mm.mmct_initialize(self.error_handler) self.fh = None self.filename = filename self.input_string = input_string if not filename and not input_string: raise ValueError( "Neither filename nor input text for MaestroReader is given") if self.input_string and not isinstance(self.input_string, str): t = type(self.input_string) raise TypeError( f"input_string of {self.__class__} needs to be type(str) but is {t}" )
def __del__(self, _mmct_terminate=mm.mmct_terminate, _m2io_terminate=mm.m2io_terminate): self.close() _mmct_terminate() _m2io_terminate() # required for iterator support def __iter__(self): return self
[docs] def getErrorHandler(self): """ Returns the error handler by querying the m2io library and if the refcount is > 0 then return the error handler that is in use by m2io. Otherwise None is returned. """ if mm.m2io_refcount() > 0: return mm.m2io_get_errhandler() else: return mm.MMERR_DEFAULT_HANDLER
def _open(self): """ Internal function to open the file. """ try: if self.input_string: self.fh = mm.m2io_open_read_from_buffer(self.input_string) else: if self.filename.endswith("dat"): read_mode = mm.M2IO_READ else: read_mode = self.read_mode self.fh = mm.m2io_open_file(self.filename, read_mode) self.type = mm.m2io_get_file_type(self.fh) except mm.MmException: # Check to see if this is due to an empty file. if self.input_string: raise elif os.path.getsize(self.filename) == 0: raise EOFError( "Could not open structure file due to zero size.") else: raise
[docs] def seek(self, position): """ Set the file position to the given position. This raise an exception for zero size file. """ if self.fh is None: self._open() self.last_position = position mm.m2io_set_file_pos(self.fh, position)
[docs] def read(self, position=None): """ Return the next Structure object. If position is given, this will be honoured. Otherwise the current position is taken. This raise an exception for zero size file, reading structure beyond end of file indicator and m2io errors. :raise EOFError: on EOF or zero size file. :raise Exception: otherwise. """ try: if position is not None: self.seek(position) return next(self) except StopIteration: raise EOFError("Could not read the next structure from file " "due to EOF")
def __next__(self): """ Return the next Structure object from the file. Set self.last_position to the file offset just before it was read. :raise StopIteration: on EOF or zero size file. :raise mm.MmException or Exception: otherwise. """ try: if self.fh is None: # First iteration; open the file: try: self._open() except EOFError: raise StopIteration() if self._index > 1: mm.m2io_goto_block(self.fh, mm.M2IO_BLOCK_WILDCARD_CT, (self._index - 1)) mm.m2io_leave_block(self.fh) if self.type == mm.M2IO_DISK_FILE or self.type == mm.M2IO_STRING: # File position is not supported for Mmod format or # "in-core" files, but we don't raise an exception if we're # using such a file; rather, a NameError will arise should # the user try to access self.last_position in this situation: self.last_position = mm.m2io_get_file_pos(self.fh) try: mm.m2io_goto_next_block(self.fh, mm.M2IO_BLOCK_WILDCARD_CT) ct = Structure(mm.mmct_ct_m2io_get(self.fh)) except mm.MmException as e: if e.rc == mm.M2IO_EOF: raise StopIteration() else: raise Exception( f"Could not read the next structure from file: {e}") return ct finally: pass
[docs] def close(self): """ Close the file. """ if getattr(self, "fh", None) is not None: mm.m2io_close_file(self.fh) self.fh = None
class OptionError(Exception): """ A parent exception class to indicate an error in setting an option. """ class UnsupportedOption(OptionError): """ An exception class to indicate an attempt to set an option that is not supported. """ def __init__(self, option_name, class_name): super(UnsupportedOption, self).__init__("The '%s' option is not " "supported by '%s'." % (option_name, class_name)) class UnsupportedOptionValue(OptionError): """ An exception class to indicate an attempt to set an option to a value that is supported. """ def __init__(self, option_name, option_value, class_name): super(UnsupportedOptionValue, self).__init__("The '%s' value for " "the '%s' option is not supported by '%s'." % (option_value, option_name, class_name)) class _BaseWriter(_ReaderWriterContextManager): """ This class provides a common implementation for structure writers. """ def setOption(self, option, value): """ Set a single option for this writer. This method is meant for options that may not be supported for all writer formats. See the `StructureWriter` class documentation for details on the available options. Raises an OptionError subclass (either UnsupportedOption or UnsupportedOptionValue) if unsuccessful. :type option: str :param option: The name of the option to set. :param value: The value for the option. The data type of this parameter depends on the option being set. """ # This default implementation always raises an UnsupportedOption # exception. Override in the subclass to support option setting. raise UnsupportedOption(option, value, self.__class__.__name__) def _initFilename(self, filename, overwrite=True): """ Save filename as absolute path to make sure relative paths are always relative to the cwd when the Writer is created. (PYTHON-934) """ # retrieving the absolute path and extending Windows OS path with tag # if number of characters in the path > 259 self.filename = fileutils.extended_windows_path(filename, only_if_required=True) if overwrite and os.path.isfile(filename): # Don't use force_remove here; if write permissions are removed # we want to honor them. os.remove(filename)
[docs]class MaestroWriter(_BaseWriter): """ A class for more efficient appending of a large number of structures to a single maestro structure file. For writing single structures, just use the Structure.write method. For appending a small (less than a thousand) number of structures, the Structure.append method will perform acceptably. """ # Timings suggest a 5-10% speedup in write times on local disk (for 2500 # drug-like structure with a write time of about 5s) when compared with # multiple structure.append() calls. # # For NFS mounted dirs (on /home in nyc) timings showed approximately a # 30% speedup. (For 2500 structures write time goes from 17 to 11s, for # 10000 structures write time goes from 63s to 46s.)
[docs] def __init__(self, filename, overwrite=True): """ Initialize needed mmlibs and open the file 'filename'. Note that the file will not be completely written until it is explicitly closed or the object is garbage collected. :type filename: str :param filename: The filename to write to. :type overwrite: bool :param overwrite: If False, append to an existing file if it exists. """ self.fh = None self._initFilename(filename, overwrite=overwrite) mm.m2io_initialize(mm.error_handler)
[docs] def append(self, ct): """ Append the provided structure to the open mae file. Set self.last_position to the file offset just before it was appended. The use of this class and method should be preferred for large numbers of structures (say, >1000), but for smaller numbers of structures you can use the Structure.append method directly. """ # Don't call the open until an append action is taken. This avoids # the creation of a maestro file with just the s_m_m2io_version # block in the situation where append is never called. if self.fh is None: self.fh = mm.m2io_open_file(self.filename, mm.M2IO_APPEND) ct.closeBlockIfNecessary(self.fh) # Call the method to put a structure to the mmct file. This # allows the Structure and TextualStructure objects to do their own # things: self.last_position = mm.m2io_get_file_pos(self.fh) ct.putToM2ioFile(self.fh)
[docs] def close(self): """ Close the file. """ if getattr(self, "fh", None) is not None: mm.m2io_close_file(self.fh) self.fh = None
def __del__(self, _m2io_terminate=mm.m2io_terminate): """ Close the file and terminate the mmlibs. """ self.close() _m2io_terminate()
[docs]class SDWriter(_BaseWriter): """ A class for more efficient appending of a large number of structures to a single SD structure file. For writing single structures, just use the Structure.write method. For appending a small (less than a thousand) number of structures, the Structure.append method will perform acceptably. """ # subclass str to allow passing in str values (legacy)
[docs] class Options(str, enum.Enum): stereo = 'stereo' assume_3d = 'assume_3d' # default is False write_v3000 = 'write_v3000' # default is False
[docs] def __init__(self, filename, overwrite=True): """ Initialize needed mmlibs and open the file 'filename'. Note that the file will not be completely written until it is explicitly closed or the object is garbage collected. :type filename: str :param filename: The filename to write to. :type overwrite: bool :param overwrite: If False, append to an existing file if it exists. """ self.fh = None # Save filename as absolute path to make sure relative paths are # always relative to the cwd when the SDWriter is created. (EV 72534) self.filename = os.path.abspath(filename) if overwrite and os.path.isfile(filename): os.remove(filename) self.stereo = None self.assume_3d = None self.write_v3000 = None mm.mmmdl_initialize(mm.error_handler)
[docs] def setOption(self, option, value): """ Set an option not supported for all StructureWriter formats. The supported options for SDWriter are: * SDWriter.Options.stereo: NO_STEREO, STEREO_FROM_ANNOTATION, STEREO_FROM_ANNOTATION_AND_GEOM * SDWriter.Options.assume_3d: True (default), False * SDWriter.Options.write_v3000: True, False (default, only write V3000 for large structures) """ if option not in list(type(self).Options): raise UnsupportedOption(option, type(self).__name__) if option == type(self).Options.stereo: options = {o for o in stereo_options if o != STEREO_FROM_GEOMETRY} if value in options: self.stereo = value return else: if value in (True, False): if option == type(self).Options.assume_3d: self.assume_3d = value elif option == type(self).Options.write_v3000: self.write_v3000 = value return raise UnsupportedOptionValue(option, value, type(self).__name__)
[docs] def append(self, ct): """ Append the provided structure to the open file. """ # First check CT is able to be written. TextualStructure objects # are not: if isinstance(ct, TextualStructure): raise Exception("TextualStructure objects can not be written to " "an SD format file") if self.fh is None: self.fh = mm.mmmdl_new(self.filename, mm.MMMDL_APPEND) if self.stereo == NO_STEREO: mm.mmmdl_set_option(self.fh, mm.MMMDL_NO_STEREO) elif self.stereo == STEREO_FROM_ANNOTATION_AND_GEOM: mm.mmmdl_set_option(self.fh, mm.MMMDL_STEREO) elif self.stereo == STEREO_FROM_ANNOTATION: mm.mmmdl_set_option(self.fh, mm.MMMDL_STEREO_BY_ANNOTATION) if self.assume_3d is True: mm.mmmdl_set_option(self.fh, mm.MMMDL_ASSUME_3D) elif self.assume_3d is False: mm.mmmdl_set_option(self.fh, mm.MMMDL_DONT_ASSUME_3D) if self.write_v3000: mm.mmmdl_set_option(self.fh, mm.MMMDL_WRITE_V3000) mm.mmmdl_sdfile_put_ct(self.fh, ct)
[docs] def close(self): """ Close the file. """ if getattr(self, "fh", None) is not None: mm.mmmdl_delete(self.fh) self.fh = None
def __del__(self, _mmmdl_terminate=mm.mmmdl_terminate): """ Close the file and terminate the mmlibs. """ self.close() _mmmdl_terminate()
class Mol2Writer(_BaseWriter): """ Mol2 support for the StructureWriter class. """ def __init__(self, filename, overwrite=True): """ Initialize needed mmlibs and open the file 'filename'. :type filename: str :param filename: The filename to write to. :type overwrite: bool :param overwrite: If False, append to an existing file if it exists. """ self.filename = os.path.abspath(filename) if overwrite and os.path.isfile(filename): os.remove(filename) self.error_handler = mm.error_handler self.fh = None mm.mmmol2_initialize(self.error_handler) def append(self, st): """ Append the provided structure to the file. """ # First check CT is able to be written. TextualStructure objects # are not: if isinstance(st, TextualStructure): raise Exception("TextualStructure objects can not be written to " "a mol2 format file.") if self.fh is None: self.fh = mm.mmmol2_new(self.filename, mm.MMMOL2_APPEND) mm.mmmol2_put_ct(self.fh, st) def close(self): if getattr(self, "fh", None) is not None: mm.mmmol2_delete(self.fh) self.fh = None def __del__(self, _mmmol2_terminate=mm.mmmol2_terminate): self.close() _mmmol2_terminate()
[docs]class PDBWriter(_BaseWriter): """ A class for writing PDB-formatted files. Only one structure can be written to a PDB file. While this class overs no speed increase over the Structure.write() method, it provides more options. """
[docs] def __init__(self, filename, reorder_by_sequence=False, first_occ=False, translate_pdb_resnames=True): """ Initialize needed mmlibs and open the file 'filename'. Note that the file will not be completely written until it is explicitly closed or the object is garbage collected. :type filename: str :param filename: The filename to write to. :type reorder_by_sequence: bool :param reorder_by_sequence: Whether to re-order the residues by sequence before writing the PDB file. :type first_occ: bool :param first_occ: If True and there are alternate occupancy sites, only the first occupancy site will be included in the output PDB file. Otherwise, all occupancy sites will be included. :type translate_pdb_resnames: bool :param translate_pdb_resnames: If True, the pdb residue names get converted to a standard set. If False, the translation is turned off. NOTE: Any existing file will be overwritten when the class instance is created. """ self._reorder_by_sequence = reorder_by_sequence self.first_occ = first_occ self.translate_pdb_resnames = translate_pdb_resnames self._initFilename(filename) self._num_structures_written = 0
[docs] def write(self, ct): """ Write the provided structure to the PDB file. """ if self._num_structures_written > 0: raise RuntimeError( "Cannot write more than one structure to PDB file.") # First check CT is able to be written. TextualStructure objects # are not: if isinstance(ct, TextualStructure): raise Exception("TextualStructure objects can not be written to " "an PDB format file") mm.mmpdb_initialize(mm.error_handler) fh = mm.mmpdb_new() if self._reorder_by_sequence: mm.mmpdb_set(fh, mm.MMPDB_REORDER_BY_SEQUENCE) if self.first_occ: mm.mmpdb_set(fh, mm.MMPDB_FIRST_OCC) if not self.translate_pdb_resnames: mm.mmpdb_set(fh, mm.MMPDB_NO_TRANSLATE_PDB_RESNAMES) mm.mmpdb_write(fh, ct, self.filename) mm.mmpdb_delete(fh) self._num_structures_written += 1
[docs] def append(self, ct): """ Alias to the write() method (for consistency with the other Writer classes). """ self.write(ct)
[docs] def close(self): """ Does nothing. Added for consistency with other Writer classes. """
@contextlib.contextmanager def _add_pdb_pbc_properties(st): """ Within a scope, adds the PDB-like PBC properties to a structure if there is a way to determine the PBC data for the structure. :type st: `schrodinger.Structure` :param st: Structure to be updated within a context. """ def get_HM_space_group(st): """Use space group without spaces if it fits into PDB spec. Structure property will be updated. :return str: Return original space group name """ from schrodinger.application.matsci.nano import space_groups spgname_original = st.property[mm.M2IO_PDB_CRYSTAL_SPACE_GROUP] spgobj = space_groups.get_spacegroups().getSpgObjByName( spgname_original) if spgobj: # Short name is HM name (MATSCI-9091) spgname = spgobj.space_group_short_name if len(spgname) > mm.M2IO_PDB_SPG_NAME_MAX_LEN: spgname = spgname.replace(' ', '') st.property[mm.M2IO_PDB_CRYSTAL_SPACE_GROUP] = spgname return spgname_original try: pbc = infrastructure.PBC(st) except: yield else: added = set() original_spgname = None if not all( st.property.get(n) for n in infrastructure.LENGTHS_AND_ANGLES_PROPERTIES): # update all if any are missing to ensure that the are consistent for name, value in zip(infrastructure.LENGTHS_AND_ANGLES_PROPERTIES, pbc.getBoxAngles() + pbc.getBoxLengths()): st.property[name] = value added.add(name) pbc.applyToStructure(st) if mm.M2IO_PDB_CRYSTAL_SPACE_GROUP in st.property: original_spgname = get_HM_space_group(st) else: st.property[mm.M2IO_PDB_CRYSTAL_SPACE_GROUP] = mm.P1_SPACE_GROUP added.add(mm.M2IO_PDB_CRYSTAL_SPACE_GROUP) if mm.M2IO_PDB_CRYSTAL_Z not in st.property: st.property[mm.M2IO_PDB_CRYSTAL_Z] = 1 added.add(mm.M2IO_PDB_CRYSTAL_Z) try: yield finally: for p in added: del st.property[p] if original_spgname: st.property[mm.M2IO_PDB_CRYSTAL_SPACE_GROUP] = original_spgname class MMCIFWriter(_BaseWriter): """ Write a structure to macromolecular cif aka pdbx format. Suitable for use with applications that expect the cif format used by the RCSB PDB, for instance. Can be read by the Schrodinger .cif reader. Currently uses openbabel and .pdb as a shim. """ _FMT = '-ommcif' def __init__(self, filename): """ :type filename: str :param filename: Name of file to which structures should be written """ self._initFilename(filename) def write(self, st): """ Write a Structure to the file specified in the constructor. :type st: `schrodinger.Structure` :param st: Structure to write to a file """ with tempfile.NamedTemporaryFile(suffix='.pdb', delete=False) as tf: name = tf.name try: with PDBWriter(name) as writer: with _add_pdb_pbc_properties(st): writer.append(st) # otherwise the title is the name of the temporary file output = subprocess.check_output([ 'obabel', '-ipdb', name, '--title', st.title, self._FMT, '-O', self.filename ], stderr=subprocess.STDOUT) # The pdb to cif route seems to always return a 0 exit code, so read # the text output. if 'err' in output.decode().lower(): raise RuntimeError(output.decode()) finally: fileutils.force_remove(name) def append(self, st): raise AttributeError( ".cif files store a single structure, append is not allowed") class CIFWriter(MMCIFWriter): """ Write a structure to small-molecule cif format. Suitable for use with applications that expect the cif format used by the Cambridge Crystalographic Database, for instance. Can be read by the Schrodinger .cif reader. Currently uses openbabel and .pdb as a shim. """ _FMT = '-ocif' pass
[docs]class StructureWriter(_ReaderWriterContextManager): """ A class for efficient writing of multiple structures to a single structure file. If you are writing a single structure, you can more easily use the `Structure.write` method. Options that are not supported for all formats can be set with the setOption method, for example:: writer = StructureWriter(filename) try: writer.setOption(stereo=STEREO_FROM_ANNOTATION) except OptionError: # take action based on unsupported option/value here Currently, the following options are available: - `stereo` - This option controls how stereochemical properties are written. It does not affect the output geometry. - This option is supported for `SD`, `SMILES`, and `SMILESCSV`, although not all options are supported for `SD`. - Option values are `NO_STEREO`, `STEREO_FROM_ANNOTATION_AND_GEOM`, `STEREO_FROM_ANNOTATION`, and `STEREO_FROM_GEOMETRY`. - The default value is `STEREO_FROM_ANNOTATION_AND_GEOM`. - With `STEREO_FROM_ANNOTATION_AND_GEOM`, current annotation properties of the Structure are used when present. Chiral atoms without annotation properties will have their stereochemistry determined from geometry (if possible) and will be written with definite stereochemical configuration. - With `NO_STEREO`, no stereochemical information will be written. - With `STEREO_FROM_ANNOTATION`, stereochemical information will be written based only on the current annotations. Use this option to allow for specification of stereochemistry on some centers while leaving others undefined. This should be faster than identifying stereochemistry from the 3D geometry. - With `STEREO_FROM_GEOMETRY`, stereochemistry will be written for all chiral atoms based on the 3D geometry. This option is not supported for `SD` format. """
[docs] def __init__(self, filename, overwrite=True, format=None, stereo=None): """ Create a structure writer class based on the format. :type filename: str or pathlib.Path :param filename: The filename to write to. :type overwrite: bool :param overwrite: If False, append to an existing file instead of overwriting it. :type format: str :param format: The format of the file. Values should be specified by one of the module-level constants MAESTRO, MOL2, SD, SMILES, or SMILESCSV. If the format is not explicitly specified it will be determined from the suffix of the filename. Multi-structure PDB files are not supported. :type stereo: enum :param stereo: Use of the stereo option in the constructor is pending deprecation. Please use the setOption method instead. See the class docstring for documentation on the stereo options. """ filename = str(filename) format = _check_format(filename, format) if stereo is not None: warnings.warn( "Use of the stereo option in the constructor is " "deprecated. Please use either the setOption method " "instead.", PendingDeprecationWarning, stacklevel=2) if format == PDB: if not overwrite: raise ValueError("PDB-formatted files can not be appended to") self.writer = PDBWriter(filename) elif format == SD: self.writer = SDWriter(filename, overwrite) elif format == MAESTRO: self.writer = MaestroWriter(filename, overwrite) elif format == SMILES: self.writer = SmilesWriter(filename, overwrite, stereo) elif format == SMILESCSV: if not overwrite: raise ValueError( "Smiles CSV-formatted files can not be appended to") self.writer = SmilesCsvWriter(filename, stereo) elif format == MOL2: self.writer = Mol2Writer(filename, overwrite) elif format == XYZ: self.writer = infrastructure.StructureWriter.getWriter( filename, overwrite) else: raise ValueError( "'%s' format is not supported by the StructureWriter" % format) self.filename = filename # So that the user can easily get filename self.written_count = 0 # The number of structures written
[docs] def append(self, ct): """ Append the provided structure to the open file. """ self.writer.append(ct) self.written_count += 1
[docs] def extend(self, cts): """ Append all provided structures to the open file. """ for ct in cts: self.writer.append(ct) self.written_count += 1
[docs] def close(self): """ Close the file. """ self.writer.close()
[docs] def setOption(self, option, value): # Avoid duplicating the docstring via __doc__ assignment below; see # _BaseWriter.setOption. self.writer.setOption(option, value)
setOption.__doc__ = _BaseWriter.setOption.__doc__
[docs] @staticmethod def write(st, filename): """ Writes the given Structure to the specified file, overwriting the file if it already exists. :param st: structure object to write to file :type st: structure.Structure :param filename: filename to write to :type filename: str or pathlib.Path """ with StructureWriter(filename) as writer: writer.append(st)
[docs]class PDBReader(_ReaderWriterContextManager): """ A class for reading structures from a PDB format file. """
[docs] def __init__(self, filename, index=1, error_handler=None, all_occ=True, use_strict_resname=False): """ Initialize with a filename, an optional starting index (default of 1) and optional error_handler (default of mm.error_handler). all_occ - Whether to include alternative positions (default=True) use_strict_resname - Limit the residue name to 18-20 columns of pdb record. """ self.error_handler = self.getErrorHandler() self.fh = None mm.mmpdb_initialize(self.error_handler) self.fh = mm.mmpdb_new() if all_occ: mm.mmpdb_set(self.fh, mm.MMPDB_ALL_OCC) else: mm.mmpdb_set(self.fh, mm.MMPDB_FIRST_OCC) if use_strict_resname: mm.mmpdb_set(self.fh, mm.MMPDB_STRICT_RESNAME) mm.mmpdb_open(self.fh, filename, "r") self.current_model = index self.filename = filename
def __del__(self, _mmpdb_terminate=mm.mmpdb_terminate): self.close() _mmpdb_terminate()
[docs] def close(self): """ Close the file. """ if getattr(self, "fh", None) is not None: mm.mmpdb_delete(self.fh) self.fh = None
# required for iterator support def __iter__(self): return self def _seek_current_model(self): with _suppress_error_reporting(self.getErrorHandler()): try: if self.current_model > 1: mm.mmpdb_goto(self.fh, self.current_model) except mm.MmException as e: if e.rc == mm.MMPDB_EOF: return False else: raise return True def __next__(self): """Return the next Structure object from the file. """ try: mm.mmerr_suppress_print(self.getErrorHandler()) if os.path.getsize(self.filename) == 0 \ or not self._seek_current_model(): # Empty file, or reached end of file raise StopIteration() try: mm.mmpdb_read(self.fh, mm.MMPDB_OVERWRITE) pdb_ct = infrastructure.create_structure(0) mm.mmpdb_to_mmct(self.fh, pdb_ct.getHandle()) except mm.MmException as e: if e.rc == mm.MMPDB_EOF: raise StopIteration() else: raise Exception( f"Could not read the next structure from file: {e}") finally: mm.mmerr_restore_print(self.getErrorHandler()) ret_ct = Structure(pdb_ct) self.current_model += 1 return ret_ct
[docs] def getErrorHandler(self): """ Returns the error handler by querying the pdb library and if the refcount is > 0 then return the error handler that is in use by pdb. Otherwise None is returned. """ if mm.mmpdb_refcount() > 0: return mm.mmpdb_get_errhandler() else: return mm.MMERR_DEFAULT_HANDLER
[docs]class SDReader(_ReaderWriterContextManager): """ A class for reading structures from a SD format file. """
[docs] def __init__(self, filename, index=1, error_handler=None, ignore_errors=False, input_string=None, import_sdprop_as_string=False, import_sdprop_per_file=True, ignore_structureless=True): """ Initialize the reader. :type filename: string :param filename: The filename to read. :type index: int :param index: The index of the first structure to read. :type error_handler: int :param error_handler: The handle of the mmerr object to use for error logging. Defaults to schrodinger.infra.mm.error_handler. :type ignore_errors: bool :param ignore_errors: If True, bad structures will be skipped instead of raising an exception. If False, the caller may set self._previous_structure to None to continue reading past the error. :type ignore_structureless: bool :param ignore_structureless: If False, exception will be raised for SD entries without atoms in case ignore_errors is also False (ignore_errors takes precedence, ignore_structureless makes atomless entries to be considered as errors). :type input_string: string :param input_string: A string with the contents of an SD format file. If provided, the filename argument is ignored. :type import_sdprop_as_string: bool :param import_sdprop_as_string: Import all properties as strings. Setting this to True speeds file reading. :type import_sdprop_per_file: bool :param import_sdprop_per_file: Setting this to True indicates that all structures in the file will have the same set of properties. If this can be guaranteed, it speeds file reading. """ self.error_handler = self.getErrorHandler() self.fh = None mm.mmmdl_initialize(self.error_handler) if input_string: self.fh = mm.mmmdl_new_from_string(input_string) else: self.fh = mm.mmmdl_new(filename, mm.MMMDL_READ) if import_sdprop_as_string: mm.mmmdl_set_option(self.fh, mm.MMMDL_IMPORT_SDPROP_AS_STRING) elif import_sdprop_per_file: mm.mmmdl_set_option(self.fh, mm.MMMDL_IMPORT_SDPROP_PER_FILE) mm.mmmdl_sdfile_fix_prop_types(self.fh, "1:") # mm.MMMDL_STEREO is now the MMMDL default self.current_structure = index self._previous_structure = None self.ignore_errors = ignore_errors self.ignore_structureless = ignore_structureless self.structures_skipped = 0 # number of SD structures that were skipped
[docs] def getErrorHandler(self): """ Returns the error handler by querying the mmmdl library and if the refcount is > 0 then return the error handler that is in use by mmmdl. Otherwise None is returned. """ if mm.mmmdl_refcount() > 0: return mm.mmmdl_get_errhandler() else: return mm.MMERR_DEFAULT_HANDLER
def __del__(self, _mmmdl_terminate=mm.mmmdl_terminate): self.close() _mmmdl_terminate()
[docs] def close(self): """ Close the file. """ fh = getattr(self, "fh", None) if fh is not None: mm.mmmdl_delete(self.fh) self.fh = None
# required for iterator support def __iter__(self): return self def __next__(self): """ Return the next Structure object from the file. """ sd_ct = -1 try: p = self._previous_structure if p is None or p != self.current_structure: # For performance reasons, only do a goto if this is the first # structure we read, or if someone changed # self.current_structure behind our back. mm.mmmdl_sdfile_goto(self.fh, self.current_structure) sd_ct = mm.mmmdl_sdfile_get_ct(self.fh) except mm.MmException as e: if e.rc == mm.MMMDL_EOF: # EOF raise StopIteration() else: # Could not read the next structure from SD file # If __next__() method will get called again, read NEXT st: self.current_structure += 1 self._previous_structure = self.current_structure ignore = self.ignore_errors or (self.ignore_structureless and e.rc == mm.MMMDL_NOSTRUCTURE) if ignore: # Skip the bad structure: # Can skip up to 1000 structures at once. self.structures_skipped += 1 # Will force a call to mmmdl_sdfile_goto() - Ev:123004 self._previous_structure = None return next(self) else: raise Exception( "Could not read the next structure from file") self.current_structure += 1 self._previous_structure = self.current_structure ret_ct = Structure(sd_ct) return ret_ct
[docs]class StructureReader(_ReaderWriterContextManager): """ Read structures from files of various types. Example usage:: # Read the first structure in a file: st = structure.StructureReader.read('myfile.pdb') # Read all structures from a file: for st in structure.StructureReader('myfile.sdf'): <do something with st> # Start reading at the second structure entry in the file for st in structure.StructureReader('myfile.sdf', index=2): <do something with st> # Assign iterator to a variable and read first 2 structures: st_reader = structure.StructureReader('myfile.mae') st1 = next(st_reader) st2 = next(st_reader) """
[docs] def __init__(self, filename, index=1): if isinstance(filename, infrastructure.StructureReader): # Hack: accept reader as "filename"; used by fromString() self.reader = filename else: filename = str(filename) if not os.path.isfile(filename): raise IOError("File does not exist: %s" % filename) self.reader = infrastructure.StructureReader.getReader(filename) if index != 1: self.reader.setIndex(index)
def __iter__(self): return self def __next__(self): ct = self.reader.readNext() return Structure(ct)
[docs] def setIndex(self, index): self.reader.setIndex(index)
[docs] def close(self): self.reader = None
[docs] @staticmethod def read(filename, index=1): """ Reads the first Structure from the given file. :param filename: filename to read from :type filename: str or pathlib.Path :param index: the positional index of the structure to read :type index: int :return: first structure from the given file :rtype: structure.Structure """ filename = str(filename) # When reading a single structure from SD, update the iterator as to # avoid precalculating property types for all structures in the file, # which might be very slow if _check_format(filename) == SD: structure_reader = partial(SDReader, import_sdprop_per_file=False) else: structure_reader = StructureReader with structure_reader(filename, index=index) as reader: return next(reader)
[docs] @staticmethod def fromString(input_string, index=1, format=MAESTRO): """ Creates a reader iterator from an input string. This is only supported for Maestro and SD formats. :param input_string: the string representation of the Structure. :type input_string: str :param index: the index of the first structure to read. :type index: int :param format: the string format, either MAESTRO or SD. :type format: str """ if not isinstance(input_string, str): raise TypeError("Invalid type for input_string: " f"{type(input_string)}") if format == MAESTRO: reader = infrastructure.MaestroReader.fromString(input_string) return StructureReader(reader, index=index) elif format == SD: return SDReader(None, index=index, input_string=input_string) else: raise NotImplementedError("StructureReader.fromString() does not " f"support `{format}` format.")
def _check_format(filename, format=None): """ Get the format implied by the filename. If format isn't None, simply return the format provided. Otherwise, check the filename suffix and return one of "maestro", "pdb", "sd", "mol2", "smiles," or "smilescsv". raise ValueError: If the suffix is unrecognized. """ if format is not None: return format format = fileutils.get_structure_file_format(filename) if format is None: raise ValueError(f"Unsupported file extension for file {filename}") else: return format
[docs]def write_cts(sts, filename): """ Write multiple structures to a file :param sts: An iterable containing the structures to write :type sts: iter :param filename: The filename to write the structures to. File format will be determined from the filename suffix. :type filename: str """ with StructureWriter(filename) as writer: writer.extend(sts)
[docs]def count_structures(filename): """ Returns the number of structures in the specified file. For PDB files, returns the number of MODELs. Optionally an error_handler may be specified (default of mm.error_handler). """ format = _check_format(filename) if not os.path.isfile(filename): raise IOError("File does not exist: %s" % filename) if format in (MAESTRO, PHASE_HYPO, SD, MOL2, PDB, CIF): return infrastructure.StructureReader.countStructures(filename) elif format == SMILES: with _get_file_handle(filename) as fh: return sum((1 for line in fh if line.strip())) elif format == SMILESCSV: # Use the Python csv module to count rows, as each row can span # multiple lines: with csv_unicode.reader_open(filename) as fh: num_structures = sum((1 for row in csv.reader(fh))) if num_structures > 0: # First line is header num_structures -= 1 return num_structures # FIXME use ChmDelimitedPatterns.calculateRowCount() instead? # Otherwise invalid format raise ValueError(f"Unsupported file extension: {filename}")
[docs]class TextualStructure(Structure): """ A sub-class of Structure for use when reading from a Maestro format file and only the structure-level properties are needed. The actual atom and bond records are not parsed from the file and so can't actually be accessed. The only things possible with this type of Structure are to access the structure level properties or to write it out unchanged to a file. Attempts to access the atom or bond data, directly or indirectly, will raise an exception. The only useful way to create a TextualStructure object is via the MaestroTextReader. """
[docs] def __init__(self, ct, txt): """ Initialize the TextualStructure object. The Structure handle will usually have no atoms but will have an unrequested data handle associated with it which can be used to access the Structure-level properties. 'txt' should be the full textual representation of the f_m_ct block as read from the Maestro format file. """ # Initialize the base class Structure.__init__(self, ct) self._text_rep = txt
def __str__(self): """ Return the structure object as a text string """ return self._text_rep # Redefine atom, molecule, chain, residue and ring so they raise exceptions @property def atom(self): raise AttributeError( "It is not possible to access atoms for TextualStructure objects") @property def atom_total(self): raise AttributeError( "It is not possible to access atoms for TextualStructure objects") @property def molecule(self): raise AttributeError( "It is not possible to access molecules for TextualStructure " "objects") @property def chain(self): raise AttributeError( "It is not possible to access chains for TextualStructure objects") @property def residue(self): raise AttributeError("It is not possible to access residues for " "TextualStructure objects") @property def ring(self): raise AttributeError( "It is not possible to access rings for TextualStructure objects") @property def property(self): """ Dictionary-like container of structure properties. Keys are strings of the form `type_family_name` as described in the `PropertyName` documentation. :note: Unlike the `Structure.property` dictionary, this dictionary is read-only. """ if self._property is None: self._property = _StructureProperty(self, read_only=True) return self._property def _write_ct_as_text(self, filename, mode=mm.M2IO_WRITE): """ Write a TextualStructure object to a Maestro format file. """ fh = mm.m2io_open_file(filename, mode) try: # From 62436 # We may need to close the top-level block. If the file # has just been opened then we'll need to close the header # block. Turn off error handling as there'll be an error if the # block wasn't actually open: with _suppress_error_reporting(mm.error_handler): try: mm.m2io_close_block(fh) except mm.MmException: pass mm.m2io_put_text_block(fh, str(self)) finally: mm.m2io_close_file(fh)
[docs] def write(self, filename, format=None): """Write the structure to a file, overwriting any previous content. File will only be written to Maestro format. """ if format is None: fmt = _check_format(filename) else: fmt = format if fmt != 'maestro': raise Exception("Textual Structure objects can only be written to " "Maestro format files.") self._write_ct_as_text(filename, mm.M2IO_WRITE)
[docs] def append(self, filename, format=None): """ Append the structure to the file. File will only be written to Maestro format. """ if format is None: fmt = _check_format(filename) else: fmt = format if fmt != 'maestro': raise Exception("Textual Structure objects can only be written to " "Maestro format files.") self._write_ct_as_text(filename, mm.M2IO_APPEND)
[docs] def putToM2ioFile(self, filehandle): """ Used by the Maestro writer - put a single structure to the (already open) filehandle """ mm.m2io_put_text_block(filehandle, self._text_rep)
[docs] def closeBlockIfNecessary(self, filehandle): """ Used by the Maestro writer to leave the header block if necessary. For Structure objects this is not needed so it only returns """ mm.m2io_close_block(filehandle)
[docs] def getStructure(self): """ Return a Structure object for this TextualStructure by parsing the internal text representation into an mmct. """ with MaestroReader("", input_string=self._text_rep) as reader: return next(reader)
[docs] @staticmethod def read(filename): """Reads the first structure from a Maestro file. TextualStructure will only read from files in Maestro format. """ format = _check_format(filename) if format == MAESTRO: with MaestroTextReader(filename) as reader: return next(reader) else: raise ValueError("TextualStructure can only read from " "Maestro format files.")
[docs]class SmilesStructure(object): """ SMILES representation of a Structure that is returned by SmilesReader and SmilesCsvReader. When written to a SMILES-formatted file, properties other than the title are not retained. When the USE_RDKIT_FOR_SMILESSTRUCTURE feature flag is enabled, CXSMILES are supported (the extension string is part of the 'smiles' member). """ mmsmiles_initialized = False
[docs] def __init__(self, pattern, properties=None): self.smiles = pattern if not properties: self.property = {"s_m_title": ""} else: if isinstance(properties, str): # Support for previous self.property = {"s_m_title": properties} else: self.property = dict(properties)
def __str__(self): """ Return a string representation of this structure. """ return "SmilesStructure(%s)" % (self.smiles)
[docs] def write(self, filename): """ Write the structure to a SMILES formatted file. """ with _get_file_handle(filename, 'wt') as fh: self._writeSmiles(fh)
[docs] def append(self, filename): """ Append the structure to a SMILES formatted file. """ with _get_file_handle(filename, 'at') as fh: self._writeSmiles(fh)
def _writeSmiles(self, fh): text = "%s %s\n" % (self.smiles, self.property["s_m_title"]) fh.write(text) def _writeSmilesCsv(self, filename, append): pass
[docs] def get2dStructure(self, add_hydrogens=False): """ Return a 2D Structure object for this SMILES. The structure will have only 2D coordinates, with stereo annotation properties for chiral atoms with specified chirality. NOTE: Use for 2D applications only. :rtype: `Structure.Structure` :return: 2D structure. :raises ValueError: if self.smiles is set to an invalid SMILES string. """ st = None if mmutil.feature_flag_is_enabled(mmutil.USE_RDKIT_FOR_SMILESSTRUCTURE): hydrogens = adapter.Hydrogens.EXPLICIT if add_hydrogens else adapter.Hydrogens.AS_INPUT st = adapter.to_structure(self.smiles, adapter.Generate2DCoordinates.Enable, hydrogens) else: adaptor = canvas.ChmMmctAdaptor() try: chmmol = canvas.ChmMol.fromSMILES( self.smiles.split(maxsplit=1)[0]) except RuntimeError as err: if str(err).startswith("Unable to parse SMILES"): raise ValueError(err) raise if add_hydrogens: atom_option = canvas.ChmAtomOption.H_All h_visibility = canvas.optionMDL.H_Visible else: atom_option = canvas.ChmAtomOption.H_Default h_visibility = canvas.optionMDL.H_AsWritten canvas.Chm2DCoordGen.generateAndApply(chmmol, atom_option) st = Structure(adaptor.create(chmmol, False, h_visibility)) st.property.update(self.property) return st
[docs] def get3dStructure(self, require_stereo=True): """ Return a 3D Structure object for this SMILES with all hydrogens added. :type require_stereo: bool :param require_stereo: Whether to require all chiral centers to have defined stereochemistry via annotation properties. Defaults to True. UndefinedStereochemistry exception is raised if any chiral atom has ambiguous chirality. If set to False, ambiguous chiralities will be expanded arbitrarily. :rtype: `Structure.Structure` :return: Volumized 3D structure. """ st = self.get2dStructure() st.generate3dConformation(require_stereo) return st
@property def title(self): return self.property['s_m_title'] @title.setter def title(self, title): self.property['s_m_title'] = title
[docs]class SmilesReader(_ReaderWriterContextManager): """ A class for reading structures from a SMILES formatted file. Returns instances of SmilesStructure. When the USE_RDKIT_FOR_SMILESSTRUCTURE feature flag is enabled, this class will parse CXSMILES strings. """
[docs] def __init__(self, filename, index=1): """ Initialize with a filename, an optional starting index (default of 1). """ format = _check_format(filename) if format != SMILES: raise Exception("SmilesReader can read only SMILES-formatted files") self.fh = _get_file_handle(filename) current_structure = 1 while current_structure < index: line = self.fh.readline() if not line: raise Exception("SmilesReader: reached EOF before reaching " "specified position (%i)" % index) current_structure += 1
def __del__(self): if getattr(self, "fh", None) is not None: self.fh.close() self.fh = None # required for iterator support def __iter__(self): """ Return the iterator for all SmilesStructures from the file """ return self def __next__(self): """ Return the next SmilesStructure from the file. Raises StopIteration on EOF. """ line = self.fh.readline() if not line: # EOF raise StopIteration if not line.strip(): return next(self) # Skip blank line s = line.rstrip("\r\n").split(None, 1) # Fix for PYAPP-4659 pattern = s[0] if len(s) == 1: title = '' else: # Check for CXSMILES extensions if s[1].startswith('|') and s[1].count('|') >= 2: separator = line[len(pattern)] try: # CXSMILES can have other CXSMILES embedded inside curly braces, # see examples in the R Group section in # https://docs.chemaxon.com/display/docs/chemaxon-extended-smiles-and-smarts-cxsmiles-and-cxsmarts.md # Find the CXSMILES delimiter + the .smi file separator ext_end = s[1].index(f'|{separator}') except ValueError: if s[1].endswith('|'): pattern = f'{pattern}{separator}{s[1]}' properties = {"s_m_title": ''} return SmilesStructure(pattern, properties) else: pattern = f'{pattern}{separator}{s[1][:ext_end + 1]}' properties = { "s_m_title": _unquote_string(s[1][ext_end + 2:]) } return SmilesStructure(pattern, properties) title = s[1] properties = {"s_m_title": _unquote_string(title)} return SmilesStructure(pattern, properties)
[docs]class SmilesCsvReader(_ReaderWriterContextManager): """ A class for reading structures from a SMILES CSV formatted file. This format is used by Canvas. Returns instances of SmilesStructure. When the USE_RDKIT_FOR_SMILESSTRUCTURE feature flag is enabled, this class will parse CXSMILES strings. The extension string must be part of the SMILES field, and must be enclosed in double quotes in case it contains any commas. """
[docs] def __init__(self, filename, index=1): """ Initialize with a filename, an optional starting index (default of 1). """ format = _check_format(filename) if format != SMILESCSV: raise Exception( "SmilesCsvReader can read only SMILES CSV-formatted files") self.fh = _get_file_handle(filename) self.reader = csv.DictReader(self.fh) self.reader.fieldnames = [ _unquote_string(key) for key in self.reader.fieldnames ] if not any(key.lower() == 'smiles' for key in self.reader.fieldnames): warnings.warn( f'Could not find a header row in {filename}; assuming field names are [SMILES, TITLE]' ) self.fh.seek(0) self.reader = csv.DictReader(self.fh, fieldnames=['smiles', 'title']) # Increment to specified index try: for i in range(1, index): next(self.reader) except StopIteration: raise ValueError("Structure index %i is not in input file " "(total %i structures)" % (index, i))
def __del__(self): self.close() # required for iterator support def __iter__(self): """ Return the iterator for all SmilesStructures from the file """ return self def __next__(self): """ Return the next SmilesStructure from the file. Raises StopIteration on EOF. """ row = next(self.reader) prop_dict = {} pattern = None for key, value in row.items(): # Skip keys with missing values or extra values without a key if key is None or value is None: continue key_lower = key.lower() value = _unquote_string(value) if key_lower == 'smiles': pattern = value elif key_lower in ('name', 's_m_title', 'title', 'idnumber'): prop_dict['s_m_title'] = value else: prop_key, prop_value = _csv_parse_prop_value(key, value) prop_dict[prop_key] = prop_value return SmilesStructure(pattern, prop_dict)
[docs] def close(self): if getattr(self, "fh", None) is not None: self.fh.close() self.fh = None
[docs]class SmilesWriter(_BaseWriter): """ More efficient writing of a large number of structures to a single SMILES file. """
[docs] def __init__(self, filename, overwrite=True, stereo=None): """ :type filename: str :param filename: The filename to write to. :type overwrite: bool :param overwrite: If False, append to an existing file if it exists. :type stereo: enum :param stereo: See the `StructureWriter` class for documentation on the allowed values. """ self.fh = None self.filename = os.path.abspath(filename) self._smiles_generator = None # for writing Structure objects if stereo is None: self._stereo = STEREO_FROM_ANNOTATION_AND_GEOM else: self._stereo = stereo if overwrite and os.path.isfile(filename): fileutils.force_remove(filename)
[docs] def append(self, st): """ Append the provided structure to the open SMILES file. """ _lazy_import_smiles() if self.fh is None: self.fh = _get_file_handle(self.filename, 'at') if isinstance(st, SmilesStructure): st._writeSmiles(self.fh) else: # Assume st is a Structure object (generate SMILES) if not self._smiles_generator: self._smiles_generator = smiles.SmilesGenerator( stereo=self._stereo, unique=True) pattern = self._smiles_generator.getSmiles(st) text = "%s %s\n" % (pattern, st.title) self.fh.write(text)
[docs] def close(self): """ Close the file. """ if getattr(self, "fh", None) is not None: self.fh.close() self.fh = None
def __del__(self): """ Close the file when instance is deleted. """ self.close()
[docs]class SmilesCsvWriter(_BaseWriter): """ More efficient writing of a large number of structures to a single SMILES CSV file. """
[docs] def __init__(self, filename, stereo=None, props=None): """ :note: Excessive memory may be used by this class if the props argument is not specified and many structures are appended. :type filename: str :param filename: The filename to write to. :type stereo: enum :param stereo: See the `StructureWriter` class for documentation on the allowed values. :type props: list :param props: List of property names to export. If specified, then the CSV header is derived from this list, and structure lines are written by the append() method. If not specified, then CSV header will include all properties of all structures, and the output file will only be written when the close() method is called. (All structures will be cached in memory until flushed to disk.) """ self.fh = None self.filename = os.path.abspath(filename) self._smiles_generator = None # for writing Structure objects if stereo is None: self._stereo = STEREO_FROM_ANNOTATION_AND_GEOM else: self._stereo = stereo if props is None: self._props = None else: self._props = props self._ct_data_list = [] self._ct_prop_names = [] # NOTE: Always overwriting, because of the Canvas CSV header line: if os.path.isfile(filename): fileutils.force_remove(filename)
[docs] def append(self, st): """ Append the provided structure to the open SMILES CSV file. """ pattern, prop_dict = self._getCtData(st) if self._props: # props argument was specified if self.fh is None: # write header: self._ct_prop_names = self._props self._openWriter(self._ct_prop_names) # write st: self._writeRow(pattern, prop_dict) else: # Expand internal prop list: new_props = [p for p in prop_dict if p not in self._ct_prop_names] self._ct_prop_names.extend(new_props) self._ct_data_list.append((pattern, prop_dict))
def _openWriter(self, propnames): """ Open the CSV writer and write the header derived from the specified property names. """ self.fh = _get_file_handle(self.filename, 'wt', encoding="utf-8", newline="") header = ['SMILES'] # Always call the title "NAME": if 's_m_title' in propnames: header.append('NAME') for propname in propnames: if propname.startswith('s_csv_'): propname = propname[6:] if propname != 's_m_title': header.append(propname) self.writer = csv.DictWriter(self.fh, fieldnames=header, extrasaction='ignore') self.writer.writeheader() def _writeRow(self, pattern, prop_dict): """ Write a row to the CSV file, include all properties in self._ct_prop_names. """ prop_dict['SMILES'] = pattern prop_dict['NAME'] = prop_dict.get('s_m_title', '') self.writer.writerow(prop_dict) def _getCtData(self, st): _lazy_import_smiles() prop_dict = collections.OrderedDict(st.property) if isinstance(st, SmilesStructure): pattern = st.smiles else: # Assume st is a Structure object (generate SMILES) if not self._smiles_generator: self._smiles_generator = smiles.SmilesGenerator( stereo=self._stereo, unique=True) pattern = self._smiles_generator.getSmiles(st) return (pattern, prop_dict)
[docs] def close(self): """ Close the file and write the data if props was computed on the fly. """ if self._props: # props argument was specified if self.fh is not None: self.fh.close() self.fh = None elif self._ct_data_list: # If there are structures to write # write header row: self._openWriter(self._ct_prop_names) for pattern, prop_dict in self._ct_data_list: # write row self._writeRow(pattern, prop_dict) self.fh.close() self.fh = None self._ct_data_list.clear()
def __del__(self): """ Close the file when instance is deleted. """ self.close()
[docs]class MultiFileStructureReader(_ReaderWriterContextManager): """ Provides a single iterator that reads structure from multiple files. Typical usage is identical to typical usage of the StructureReader class except that the class is instantiated with a python list of file names rather than a single file name. By default, the StructureReader class is used to read the files, but this is customizable with the reader_class keyword. API Example:: names = ['file1.mae', 'file2.mae', 'file3.pdb'] reader = MultiFileStructureReader(names) first_struct = next(reader) for struct in reader: do stuff By default, the reader skips files that raise Exceptions and stores the list of skipped files in the failed_files property. The current StructureReader can be accessed with the reader property """
[docs] def __init__(self, files, *args, **kwargs): """ Create a MultiFileStructureReader :type files: list :param files: A list of paths to files to be read :type reader_class: Reader class :keyword reader_class: By default, StructureReader is used to read the files. A more specific class can be provided, such as PDBReader :type pass_errors: bool :keyword pass_errors: If True, any filename that raises an expected exception will be skipped. Skipped filenames are stored in the failed_files property and can be retrieved after reading. Items of the failed_files list are tuples (filename, error_message). Expected Exceptions include: IOError (does not exist, or unreadable), ValueError (unknown extension), MmException (error opening file) or an Exception while reading structures. The default of False will cause the exceptions to be raise'd. :type skip_receptors: bool :keyword skip_receptors: Whether to skip receptors of PV files. Any additional parameters and keyword arguments are passed to the structure reader class. """ self.reader_class = kwargs.pop('reader_class', StructureReader) """ The class used to read files """ self.pass_errors = kwargs.pop('pass_errors', False) """ False if exceptions should be raised, True if they should be caught """ self.skip_receptors = kwargs.pop('skip_receptors', False) self.args = args self.kwargs = kwargs self.files = files[:] """ List of files remaining to be read """ self.current_filename = "" """ The file currently being read """ self.index_in_current_file = None """ Index of current structure in current file """ self.failed_files = [] """ List of (failed_file_name, error_message) """ self.reader = None """ Current file reader """ self._createNewReader()
def __iter__(self): """ Required to make the class an iterator """ return self def _createNewReader(self): """ Create a file reader for the next file. Sets self.reader = None if there are no more files to be read. """ if self.reader is not None: # Explicitly closing and unsetting the reader helps with garbage # collection in fast loops. self.reader.close() self.reader = None self.index_in_current_file = None while self.reader is None: try: # Raises IndexError if the file list is now empty self.current_filename = self.files.pop(0) except IndexError: # The file list was empty self.reader = None self.index_in_current_file = None return try: self.reader = self.reader_class(self.current_filename, *self.args, **self.kwargs) self.index_in_current_file = 0 if self.skip_receptors and fileutils.is_poseviewer_file( self.current_filename): next(self.reader) self.index_in_current_file = 1 except (IOError, ValueError, mmcheck.MmException) as exc: # Possible expected errors if self.pass_errors: self.failed_files.append((self.current_filename, str(exc))) else: raise def __next__(self): """ Get the next structure to process. This might either be the next structure in the currently open file, or might result in the next file being opened. :raise StopIteration: When all structures in all files have been read """ while True: # Just looping through until a reader is successfully created and we # return a structure, or we hit the end of all the files. if self.reader is None: # No more files, we're done raise StopIteration() try: self.index_in_current_file += 1 return next(self.reader) except StopIteration: # No more structures in the current file, start the next file self._createNewReader() except Exception as exc: if self.pass_errors: self.failed_files.append((self.current_filename, str(exc))) self._createNewReader() else: # This raises the caught Exception because we don't # recognize it raise
[docs]class MultiFileStructureWriter(_ReaderWriterContextManager): """ Similar to StructureWriter, except that it writes to multiple files, while keeping the number of structures per file under sts_per_file. Files will be named <basename>-NNN<extension>. Default extension is .maegz. Options: basename - The base name of the written files extension - The extension of the written files (default ".maegz") sts_per_file - Maximum number of structures to write to each file Usage:: writer = MultiFileStructureWriter(out_basename, ".maegz", 50) for st in sts: writer.append(st) writer.close() written_files = writer.getFiles() """
[docs] def __init__(self, basename, extension=".maegz", sts_per_file=100000): self._basename = basename self._extension = extension self._max_file_size = sts_per_file self._files = [] self.current_filename = None self.current_writer = None self.index_in_current_file = 0 self._total_sts_written = 0
[docs] def append(self, st): if not self.current_filename or self.index_in_current_file >= self._max_file_size: if self.current_writer: self.current_writer.close() filenum = len(self._files) + 1 self.current_filename = '%s-%s%s' % ( self._basename, str(filenum).zfill(3), self._extension) self.current_writer = StructureWriter(self.current_filename) self._files.append(self.current_filename) self.index_in_current_file = 0 self.current_writer.append(st) self.index_in_current_file += 1 self._total_sts_written += 1
[docs] def getFiles(self): """ Return a list of file paths for the written files. """ return self._files
[docs] def getNumStructures(self): """ Return the total number of structures that were written. """ return self._total_sts_written
[docs] def close(self): """ Close any open file handles """ if self.current_writer: self.current_writer.close()
def _lazy_import_smiles(): """ Import schrodinger.structutils.smiles """ # Can not be done earlier due to circular import global smiles if smiles is None: import schrodinger.structutils.smiles as smiles def _get_file_handle(filename, mode='rt', *, encoding=None, newline=None): """ Helper function to open either a common file handle, or a gzipped one. Text mode is required for csv files, so we enforce it here. """ opts = {'mode': mode, 'encoding': encoding, 'newline': newline} fname = str(filename) # filename may be a pathlib obj if fname.lower().endswith('gz'): if 't' not in mode: raise IOError('Gzipped files must be opened in text mode.') return gzip.open(filename, **opts) return open(filename, **opts) def _csv_parse_prop_value(key, value): """ Parse a property key, and if it has a proper type prefix, cast the associated value into the proper type. If there is no type prefix, or the cast fails, mark the key as being a string from a csv file. """ if key[1] != '_' or key.find('_', 3) == -1: return _format_custom_property(key), value if key[0] == 'i': cast_function = int elif key[0] == 'r': cast_function = float elif key[0] == 'b': cast_function = bool elif key[0] == 's': return key, value else: return _format_custom_property(key), value try: return key, cast_function(value) except ValueError: return _format_custom_property(key), value def _unquote_string(s): s = s.strip() if len(s) > 1 and s[0] == s[-1] and s[0] in ('"', "'"): s = s[1:-1] return s def _format_custom_property(key): # escape all free standing underscores key = re.sub(r'(?<=[^\\])_', r'\_', key) key = key.replace(' ', '_') return f's_csv_{key}'