Source code for schrodinger.rdkit.molio

"""
PathFinder helper functions for reading and writing files using RDKit Mol
objects.
"""

import collections
import copy
import csv
import gzip
import heapq
import io
import json
import os
import shutil
import sys
import tempfile
import zipfile
from contextlib import ExitStack

import more_itertools
import psutil
from rdkit import Chem

from schrodinger import structure
from schrodinger.rdkit import rdkit_adapter
from schrodinger.structutils import smiles as smiles_mod
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils.fileutils import open_maybe_compressed

logger = log.get_output_logger('pathfinder')

# Empirical file handle limit seems to be max - 4. Errors throw on max -3,
# and seem to run fine on max - 4.
MAX_FILE_HANDLE_PADDING = 4
# Arbitrary number of file handles allowed for Windows.
DEFAULT_MAX_FILE_HANDLES = 512

# Filename extension for PathFinder reactant files
PFX = '.pfx'
METADATA = 'metadata.json'
STRUCTURES = 'structures.csv'


[docs]class MolWriter(structure.StructureWriter): """ Write Mol objects to a file using a StructureWriter-like API, optionally generating 3D coordinates. """
[docs] def __init__(self, filename, generate_coordinates=True, require_stereo=False): super(MolWriter, self).__init__(filename) self.generate_coordinates = generate_coordinates self.require_stereo = require_stereo
[docs] def append(self, mol): st = rdkit_adapter.from_rdkit(mol) if self.generate_coordinates: st.generate3dConformation(require_stereo=self.require_stereo) super(MolWriter, self).append(st)
[docs]class StructureReaderAdapter: """ A wrapper for a Structure reader, which, when iterated through, yields RDKit Mol objects, and can also be used as a context manager that closes the reader on exit. """
[docs] def __init__(self, reader, implicitH=True): """ :param reader: source of structures to convert :type reader: iterable of Structure :param implicitH: use implicit hydrogens :type implicitH: bool """ self.reader = reader self.implicitH = implicitH
def __enter__(self): return self def __exit__(self, type, value, tb): try: self.reader.close() except AttributeError: # In case `reader` wasn't really a StructureReader but was # something like a list of Structure. pass def __iter__(self): for st in self.reader: try: yield rdkit_adapter.to_rdkit(st, implicitH=self.implicitH, include_coordinates=False) except (ValueError, RuntimeError) as e: logger.warning(e)
[docs]class BaseCsvMolReader: """ Parent class for CsvMolReader and CsvMolIterator. """
[docs] def __init__(self, file): """ :param file: CSV filename (file may be compressed) or file-like object. """ if hasattr(file, 'read'): self.fh = file else: self.fh = open_maybe_compressed(file, 'rt') header = self.fh.readline() self.fieldnames = next(csv.reader([header])) SKIPPED = {'SMILES', 'NAME', ''} self.propnames = [f for f in self.fieldnames if f not in SKIPPED]
def __enter__(self): return self def __exit__(self, type, value, tb): self.close()
[docs] def close(self): self.fh.close()
def _parseLine(self, line): vals = next(csv.reader([line])) row = dict(zip(self.fieldnames, vals)) mol = Chem.MolFromSmiles(row['SMILES']) if mol is None: return None for prop in self.propnames: if row[prop]: mol.SetProp(prop, row[prop]) for prop in ['NAME', 's_m_title']: if prop in row: mol.SetProp('_Name', row[prop]) break return mol
[docs]class CsvMolReader(BaseCsvMolReader): """ Read a SMILES CSV file, returning Mol objects. This is similar to RDKit's SmilesMolSupplier with delimiter=',', except that it uses the csv module instead of naively splitting on commas. This makes it possible to have field values containing commas, as long as they are quoted following the CSV convention. Note, however, that multi-line records are still not supported for efficiency reasons. Also, gzip-compressed files (identified by the filename ending in "gz") are supported. A CsvMolReader supports random access, like a list. Upon instantiation, the file is read in full and kept in memory. For a CSV file having only SMILES and an ID, this takes about 100 MB per million entries. """
[docs] def __init__(self, file): super().__init__(file) with self.fh: self.lines = self.fh.readlines()
[docs] def __len__(self): return len(self.lines)
def __getitem__(self, index): return self._parseLine(self.lines[index])
[docs]class CsvMolIterator(BaseCsvMolReader): """ Read a SMILES CSV file, returning Mol objects. Unlike CsvMolReader, CsvMolIterator does not support random access, but since it only keeps one line in memory at a time, memory use is minimal. """ def __iter__(self): return self def __next__(self): return self._parseLine(next(self.fh))
[docs]class CsvMolWriter: """ Write a CSV file given Mol objects, using a StructureWriter-like API. The first two columns are the SMILES and title, and the rest are the properties of the molecule. - We don't use structure.SmilesCsvWriter because it is too slow due to all the conversions (the overall job takes 4 times as long, so the bottleneck clearly becomes the writing of the output file!). - We don't use Chem.SmilesWriter because even though it can use comma as a delimiter, it doesn't write proper CSV files because it doesn't know how to escape the delimiter. Also, gzip-compressed files (identified by the filename ending in "gz") are supported. """
[docs] def __init__(self, filename, properties=None, cxsmiles=False): """ :param filename: file to write :type filename: str or file-like object :param properties: optional, list of names of properties to write to output file. If None, all the properties are written. (CAVEAT: if `filename` is a file object rather than an actual filename, only the properties present in the first molecule are written.) :type properties: list of str or None :param cxsmiles: when writing SMILES, use CXSMILES extensions :type cxsmiles: bool """ if hasattr(filename, 'write'): self.fh = filename self.unionize_props = False else: self.fh = open_maybe_compressed(filename, 'wt', newline='') self.unionize_props = properties is None self.filename = filename _, self.suffix = fileutils.splitext(self.filename) self._writer = None self.written_count = 0 self.properties = properties self.cxsmiles = cxsmiles self.tmpfiles = []
[docs] def append(self, mol): """ Write a molecule to the file. The first time this is called, the header row is written based on mol's properties or the properties passed to __init__, if any. :param mol: molecule :type mol: rdkit.Chem.rdchem.Mol """ props = self._getProps(mol) props_list = list(props) if (self._writer and self.unionize_props and props_list != self.properties): self.properties = props_list self._openTmp() if self._writer is None: if self.properties is None: self.properties = list(props) self._initWriter(self.properties) props['SMILES'] = self.toSmiles(mol) props['NAME'] = mol.GetProp('_Name') self._writer.writerow(props) self.written_count += 1
[docs] def toSmiles(self, mol): if self.cxsmiles: # Remove atom properties added by reaction because they are fairly # useless in a CXSmiles and take a lot of space. new_mol = remove_react_atom_props(mol) return Chem.MolToCXSmiles(new_mol) else: return Chem.MolToSmiles(mol)
def _getProps(self, mol): """ Return a dictionary of molecule properties after some munging. Property names are renamed to follow the Schrodinger convention, and float values are rounded for cosmetic reasons. :param mol: molecule :type mol: rdkit.Chem.rdchem.Mol :return: molecule properties :rtype: dict """ raw_props = rdkit_adapter.translate_rdkit_props_dict( mol.GetPropsAsDict()) # We reduce float precision because RDKit produces values such as # 320.41100000000006 where we would rather see 320.411. props = {} for name, val in raw_props.items(): if name.startswith('r_'): props[name] = round(val, 6) else: props[name] = val return props def _initWriter(self, props): """ Initialize the underlying CSV writer, using 'props' to write the header row. "SMILES" and "Name" are always added as the first two columns. :param props: property names :type props: iterable of str """ fields = ['SMILES', 'NAME'] + sorted(props) self._writer = csv.DictWriter(self.fh, fields, extrasaction='ignore') self._writer.writeheader() def _openTmp(self): logger.debug(f'Extended properties: {self.properties}') with tempfile.NamedTemporaryFile(dir='.', suffix=self.suffix, delete=False) as fh: tmpname = fh.name self.fh.close() self._writer = None self.fh = open_maybe_compressed(tmpname, 'wt', newline='') self.tmpfiles.append(tmpname) def __enter__(self): return self def __exit__(self, type, value, tb): self.close()
[docs] def close(self): self.fh.close() if self.tmpfiles: self._mergeTmpfiles()
def _mergeTmpfiles(self): with tempfile.NamedTemporaryFile(dir='.', suffix=self.suffix, delete=False) as fh: tmpname = fh.name fileutils.force_rename(self.filename, tmpname) self.tmpfiles.insert(0, tmpname) logger.debug(f'Merging tmpfiles: {self.tmpfiles}') merge_handler = CsvMergeHandler(self.tmpfiles, self.filename, dedup_field='SMILES') merge_files_in_memory(self.tmpfiles, self.filename, merge_handler, dedup=False) fileutils.force_remove(*self.tmpfiles)
[docs]class BasePfxMolReader: """ Parent class for PfxMolReader and PfxMolIterator. """
[docs] def __init__(self, filename): """ :type filename: str """ self.zipfh = zipfile.ZipFile(filename, 'r') fh = io.TextIOWrapper(self.zipfh.open(STRUCTURES)) self.csv_mol_reader = self.csv_mol_reader_class(fh)
def __enter__(self): return self def __exit__(self, type, value, tb): self.close()
[docs] def close(self): self.csv_mol_reader.close() self.zipfh.close()
[docs]class PfxMolReader(BasePfxMolReader): """ Reader for PFX (PathFinder reactants) files. These are really zip archives containing a CSV file and a metadata JSON file. Like CsvMolReader, PfxMolReader supports random access, like a list. Upon instantiation, the file is read in full and kept in memory. For a file having only SMILES and an ID, this takes about 100 MB per million entries. """ csv_mol_reader_class = CsvMolReader
[docs] def __len__(self): return len(self.csv_mol_reader)
def __getitem__(self, index): return self.csv_mol_reader[index]
[docs]class PfxMolIterator(BasePfxMolReader): """ Reader for PFX (PathFinder reactants) files. These are really zip archives containing a CSV file and a metadata JSON file. Unlike PfxMolReader, PfxMolIterator does not support random access, but since it only keeps one line in memory at a time, memory use is minimal. """ csv_mol_reader_class = CsvMolIterator def __iter__(self): return self def __next__(self): return next(self.csv_mol_reader)
[docs]class PfxMolWriter: """ Writer for PFX (PathFinder reactants) files. These are really zip archives containing a CSV file and a metadata JSON file. """
[docs] def __init__(self, filename, properties=None): """ :param filename: file to write :type filename: str :param properties: optional, list of names of properties to write to output file. If None, all the properties present on the first structure will be written (the assumption is that all molecules will have the same properties, or at least that the first molecule has all the properties that we care about). :type properties: list of str or None """ self.zipfh = zipfile.ZipFile(filename, 'w', compression=zipfile.ZIP_DEFLATED) fh = io.TextIOWrapper(self.zipfh.open(STRUCTURES, 'w'), newline='') self.csv_mol_writer = CsvMolWriter(fh, properties)
[docs] def append(self, mol): """ Write a molecule to the file. :param mol: molecule :type mol: rdkit.Chem.rdchem.Mol """ self.csv_mol_writer.append(mol)
@property def written_count(self): return self.csv_mol_writer.written_count def _writeMetadata(self): metadata = {'size': self.written_count} with io.TextIOWrapper(self.zipfh.open(METADATA, 'w')) as fh: json.dump(metadata, fh) fh.write('\n') def __enter__(self): return self def __exit__(self, type, value, tb): self.close()
[docs] def close(self): self.csv_mol_writer.close() self._writeMetadata() self.zipfh.close()
[docs]class RdkitMolWriter: """ Write Mol objects to a file using the RDKit file-writing classes, but with a StructureWriter-like API. Supports SMILES and SDF. """
[docs] def __init__(self, filename, v3000=False): """ :param filename: filename to write :type filename: str :param v3000: when writing SD, force the use of the V3000 format :type v3000: bool """ self.rdkit_writer = None if fileutils.is_gzipped_structure_file(filename): self.fh = gzip.open(filename, 'w') else: self.fh = open(filename, 'w') if fileutils.is_sd_file(filename): self.rdkit_writer = Chem.SDWriter(self.fh) self.rdkit_writer.SetForceV3000(v3000) elif fileutils.is_smiles_file(filename): self.rdkit_writer = Chem.SmilesWriter(self.fh, includeHeader=False, isomericSmiles=True) else: raise ValueError("Unsupported output file type.")
def __enter__(self): return self def __exit__(self, type, value, tb): self.close() @property def written_count(self): return self.rdkit_writer.NumMols()
[docs] def append(self, mol): self.rdkit_writer.write(mol)
[docs] def close(self): self.rdkit_writer.close() self.fh.close()
[docs]class NoneSkipper: """ A wrapper for a mol supplier, which, when iterated through, skips the `None` mols, and can also be used as a context manager. """
[docs] def __init__(self, supplier): """ :param supplier: supplier of molecules :type supplier: iterable of Mol """ self.supplier = supplier
def __enter__(self): return self def __exit__(self, type, value, tb): try: self.supplier.close() except AttributeError: pass def __iter__(self): for mol in self.supplier: if mol is not None: yield mol
[docs] def __len__(self): # This will only work if supplier supports it! return len(self.supplier)
[docs]class GzippedSDMolSupplier(Chem.ForwardSDMolSupplier): """ Subclass of ForwardSDMolSupplier to read gzip-compressed files. Use as a context manager to ensure that the file gets closed. """
[docs] def __init__(self, filename, *a, **kw): """ :param str filename: gzip-compressed file :param a: positional arguments to pass through to parent :param kw: keyword arguments to pass through to parent """ self._gzip_fh = gzip.open(filename, 'rb') super().__init__(self._gzip_fh, *a, **kw)
def __enter__(self): super().__enter__() return self def __exit__(self, *a): ret = super().__exit__(*a) self._gzip_fh.close() return ret
[docs]def get_mol_writer(filename, generate_coordinates=True, require_stereo=False, v3000=False, cxsmiles=False): """ Return a StructureWriter-like object based on the command-line arguments. RDkit is used for non-Maestro formats. :param filename: filename to write :type filename: str :param generate_coordinates: generate 3D coordinates (non-SMILES formats) :type generate_coordinates: bool :param require_stereo: when generating coordinates, fail when there's unspecified stereochemistry, instead of producing an arbitrary isomer :type require_stereo: bool :param v3000: when writing SD, force the use of the V3000 format :type v3000: bool :param cxsmiles: when writing SMILES, use CXSMILES extensions :type cxsmiles: bool """ if fileutils.is_maestro_file(filename): return MolWriter(filename, generate_coordinates=generate_coordinates, require_stereo=require_stereo) elif fileutils.is_csv_file(filename) or is_csvgz(filename): return CsvMolWriter(filename, cxsmiles=cxsmiles) else: return RdkitMolWriter(filename, v3000=v3000)
[docs]def supported_output_format(filename): """ Check whether we know how to write a file with a given name, but without actually opening a file. Used for argument validation. :type filename: str :rtype: bool """ fmt = fileutils.get_structure_file_format(filename) return fmt in { fileutils.MAESTRO, fileutils.SD, fileutils.SMILES, fileutils.SMILESCSV } or is_csvgz(filename)
[docs]def get_mol_reader(filename, skip_bad=True, implicitH=True, random_access=True): """ Return a Mol reader given a filename or a SMILES string. For .smi and .csv files, use the RDKit SmilesMolSupplier; for other formats, use StructureReader but convert Structure to Mol before yielding each molecule. Whenever possible, the reader will be a Sequence. This is the currently the case for .smi and .csv files when skip_bad is False. (And for a SMILES string, which returns a list of size 1.) :param skip_bad: if True, bad structures are skipped implicitly, instead of being yielded as None (only applies to SMILES and CSV formats.) :type skip_bad: bool :param implicitH: use implicit hydrogens (only has an effect when reading Maestro files) :type implicitH: bool :param random_access: if False, the reader object can only be used as an iterator, and the file is not read in memory all at once. (Only applies to CSV and PFX and is ignored for other formats, which provide no random access except for uncompressed SD.) :type random_access: bool :rtype: Generator or Sequence of Mol """ if os.path.isfile(filename): if is_pfx(filename): format = PFX elif is_csvgz(filename): format = fileutils.SMILESCSV else: format = fileutils.get_structure_file_format(filename) logger.debug("Opening %s", filename) if format == fileutils.MAESTRO: reader = structure.StructureReader(filename) return StructureReaderAdapter(reader, implicitH) if format == fileutils.SMILES: supp = Chem.SmilesMolSupplier(filename, delimiter=' ', titleLine=False, nameColumn=1) elif format == fileutils.SMILESCSV: if random_access: supp = CsvMolReader(filename) else: supp = CsvMolIterator(filename) elif format == PFX: if random_access: supp = PfxMolReader(filename) else: supp = PfxMolIterator(filename) elif format == fileutils.SD: if filename.endswith('gz'): supp = GzippedSDMolSupplier(filename) else: supp = Chem.SDMolSupplier(filename) else: raise ValueError(f"Unsupported file format: {format}") if skip_bad: return NoneSkipper(supp) else: return supp else: mol = Chem.MolFromSmiles(filename) if mol is None: raise IOError(f"'{filename}' must be either a valid filename " "or a valid SMILES") # We know mol is not None, but NoneSkipper also turns the list into # a context manager, which some callers expect. return NoneSkipper([mol])
[docs]def get_mol(target, implicitH=True): """ Read a Mol from a file or a SMILES string. :param target: filename or SMILES :type target: str :param implicitH: use implicit hydrogens (only has an effect when reading Maestro files) :type implicitH: bool :rtype: rdkit.Chem.Mol """ reader = get_mol_reader(target, skip_bad=False, implicitH=implicitH) return next(iter(reader))
[docs]def combine_output_files(outfiles, out, dedup=True, sort=False, union_csv_columns=False, rdkit=False, v3000=False): """ Write the final output file. :param list[str] outfiles: subjob output filenames :param str out: output filename :param bool dedup: skip duplicate products :param bool sort: sort output (implies the subjob output is sorted) :param bool union_csv_columns: if csv, union infile columns. :param bool rdkit: Use an RDKit writer for SD files. :param bool v3000: If using an RDKit writer and writing an SD file, force V3000 format. """ logger.info("Combining subjob output files into %s." % out) if dedup: logger.info( "Duplicate products will be removed, possibly resulting " "in a smaller number of products than originally requested.") missing, existing = map(list, more_itertools.partition(os.path.isfile, outfiles)) if missing: logger.warning('Missing output files:') for fname in missing: logger.warning(f' {fname}') # ENUM-409: If we can simply concatenate the files, just do that can_concatenate = not (sort or dedup or union_csv_columns) if can_concatenate: is_csv = fileutils.is_csv_file(out) or is_csvgz(out) # Use csv-specific concatenation to deal with headers if is_csv: cat_csv_files(existing, out) # Use fileutils.cat directly if not dealing with a csv else: fileutils.cat(existing, out) return # if the results are sorted, it's more efficient to merge as streams merge_function = merge_files_as_streams if sort else merge_files_in_memory format_handler = get_format_handler(existing, out, union_csv_columns=union_csv_columns, rdkit=rdkit, v3000=v3000) n = merge_function(existing, out, format_handler, dedup) logger.info(f'Wrote {n} structures.')
[docs]def get_format_handler(infiles, outfile, union_csv_columns=False, rdkit=False, v3000=False): """ Return the appropriate format handler for a specified output file type. :param list[str] infiles: subjob output filenames, used as input for merging :param str outfile: output filename :param bool union_csv_columns: flag to write out the union of infile csv columns (if infile columns differ) :param bool rdkit: Use an RDKit writer for SD files. :param bool v3000: If using an RDKit writer and writing an SD file, force V3000 format. :return: instance of a subclass of BaseMergeHandler :rtype: CsvMergeHandler, StructureMergeHandler, or SmiMergeHandler """ # If we can't assume the first column is the compare key, use a # DictReader and DictWriter to find the SMILES column if fileutils.is_csv_file(outfile) or is_csvgz(outfile): dedup_field = 'SMILES' if union_csv_columns else None return CsvMergeHandler(infiles, outfile, dedup_field=dedup_field, union_columns=union_csv_columns) if fileutils.is_smiles_file(outfile): return SmiMergeHandler() if rdkit: return RdkitMergeHandler(v3000=v3000) # Return a structure reader/writer factory by default return StructureMergeHandler()
[docs]def merge_files_as_streams(infiles, outfile, file_handler, dedup): """ Copies structures from `infiles` into `outfile`. Rejects duplicates using 'file_handler.getCompareKey.' Assumes infiles are sorted. :param infiles: names of the structure files to be joined :type infiles: iterable over str :param outfile: output file name :type outfile: str :param file_handler: object to handle open, read and write operations for the file. :type outfile: instance of subclass of BaseMergeHandler :param dedup: flag to indicate if duplicate products should be removed from merged output file :type dedup: bool :return: number of structures written :rtype: int """ # default file handle limit, also the windows limit max_files = DEFAULT_MAX_FILE_HANDLES # if not on Windows, we can use the actual file handle limit if sys.platform != 'win32': from resource import RLIMIT_NOFILE from resource import getrlimit soft_limit, _ = getrlimit(RLIMIT_NOFILE) proc = psutil.Process() max_files = soft_limit - len( proc.open_files()) - MAX_FILE_HANDLE_PADDING merge_count = 0 st_written = 0 file_queue = collections.deque(infiles) while len(file_queue) > 0: tmp_file = f"merge_tmp_{merge_count}_" + outfile merge_count += 1 batch_iters = [] with ExitStack() as stack: while len(file_queue) > 0 and len(batch_iters) < max_files: batch_iters.append( stack.enter_context( file_handler.getProductReader(file_queue.popleft()))) # if this is the last batch, write to the final output file. if len(file_queue) < 1: tmp_file = outfile with file_handler.getProductAppender(tmp_file) as writer: # ENUM-410: merge the sorted files as streams with # heapq.merge, which forms a sorted heap without pulling all # items into memory at once. merged = heapq.merge(*batch_iters, key=file_handler.getCompareKey) last_smiles = "" for prod in merged: cur_smiles = file_handler.getCompareKey(prod) if dedup and cur_smiles == last_smiles: continue writer.append(prod) last_smiles = cur_smiles # Track final output file size if tmp_file == outfile: st_written += 1 # Add intermediate merge files to the queue if tmp_file != outfile: file_queue.append(tmp_file) logger.info(f"{st_written} structures written.") return st_written
[docs]def merge_files_in_memory(infiles, outfile, filetype_handler, dedup): """ Copies structures from `infiles` into `outfile`. Rejects duplicates using filetype_handler.getCompareKey. :param infiles: names of the structure files to be joined :type infiles: iterable over str :param outfile: output file name :type outfile: str :return: number of structures written :rtype: int """ seen = set() nwritten = 0 with filetype_handler.getProductAppender(outfile) as writer: for fname in infiles: with filetype_handler.getProductReader(fname) as reader: for prod in reader: smiles = filetype_handler.getCompareKey(prod) if not dedup or smiles not in seen: writer.append(prod) nwritten += 1 if dedup: seen.add(smiles) return nwritten
[docs]class BaseMergeHandler: """ Base class for filetype handlers for subjob output deduplication and merging. """
[docs] def getProductReader(self, file): """ Given a file name, create and return an iterable file handle to iterate over all products. :param file: file name :type file: str :return: iterable context manager over filetype-specific product format :rtype: iterable """ raise NotImplementedError
[docs] def getProductAppender(self, file): """ Given a file name, create and return a file-writing object that writes with when its "append" method is called. :param file: file name :type file: str :return: a file handle with context management that supports the append() call used in merge_files_in_memory and merge_files_as_streams. :rtype: file-like object """ raise NotImplementedError
[docs] def getCompareKey(self, product): """ Given a product (formatted according to the filetype), return the computed comparison key (SMILES string) for the product. :param product: filetype-specific product :type product: filetype-specific product (type varies) """ raise NotImplementedError
[docs]class CsvMergeHandler(BaseMergeHandler): """ Class to bundle csv read/write operations """
[docs] def __init__(self, infiles, outfile, union_columns=True, dedup_field=None): """ :param infiles: list of output files to join column, if necessary. :type infiles: list(str) :param outfile: output file :type outfile: str :param union_columns: flag to write out the union of infile csv columns (if infile columns differ) :type union_columns: bool :param dedup_field: csv column to use to check for duplicates during deduplication :type dedup_field: str """ self.header = None self.dedup_field = dedup_field self.fieldnames = None # Get fieldnames and use a csv DictReader/DictWriter if joining # columns, or deduplicating by user-specified column name if union_columns or dedup_field is not None: self.fieldnames = get_fieldnames(infiles) if len(infiles) > 0: self.first_file = infiles[0]
[docs] def getProductReader(self, file): """ Open a csv file, skip the first (header) line if necessary, and return a context-managing iterable over all remaining lines. :param file: file name :type file: str :return: iterable context manager over csv lines :rtype: _CsvReadWrapper (iter(str) or iter(dict)) """ file_handle = open_maybe_compressed(file, 'rt') if self.fieldnames is not None: reader = csv.DictReader(file_handle) else: reader = csv.reader(file_handle) # try to process the file header, if any file_header = None try: file_header = next(reader) except StopIteration: pass if file_header is not None: if self.header is None: self.header = file_header if file_header != self.header: msg = "Inconsistent header: {} != {}".format( file, self.first_file) file_handle.close() raise ValueError(msg) return CsvMergeHandler._CsvReadWrapper(file_handle, reader)
[docs] def getProductAppender(self, file): """ Open a csv file, write the first (header) line, and return a line writer that supports the getProductAppender.append calls. :param file: file name :type file: str :return: a file handle that supports the append() call used in merge_files_in_memory and merge_files_as_streams. :rtype: file-like object """ return CsvMergeHandler._CsvProductAppender(file, self.getHeader, self.fieldnames)
[docs] def getHeader(self): """ Returns the header for ProductAppenders to reference. :return: Header line for the input csv files. :rtype: str """ return self.header
[docs] def getCompareKey(self, prod): """ Compute SMILES from a given csv-formatted product. :param prod: product in question :type prod: dict or list :return: SMILES string :rtype: str """ if self.dedup_field is not None: return prod[self.dedup_field] if isinstance(prod, dict): # if dedup_field was not provided, we don't know which field # (the dict key) is the compare key. raise ValueError("Comparison column not specified, cannot compute " "compare key.") return prod[0]
class _CsvReadWrapper: """ Class to combine a csv.reader or csv.DictReader with a context manager. """ def __init__(self, file_handle, csv_reader): self.file_handle = file_handle self.csv_reader = csv_reader def __iter__(self): return iter(self.csv_reader) def __next__(self): return next(self.csv_reader) def __enter__(self): return self def __exit__(self, type, value, tb): self.file_handle.close() class _CsvProductAppender: """ Class to wrap a file handle with a csv-writing object, and redirect append() calls to its write function. """ def __init__(self, file, header_function, fieldnames=None): self.header_function = header_function self.file_handle = open_maybe_compressed(file, 'wt', newline='') self.wrote_header = False if fieldnames is None: self.csv_writer = csv.writer(self.file_handle) header = self.header_function() if header is not None: self.csv_writer.writerow(header) self.wrote_header = True else: self.csv_writer = csv.DictWriter(self.file_handle, fieldnames) self.csv_writer.writeheader() self.wrote_header = True def __enter__(self): return self def __exit__(self, type, value, tb): self.file_handle.__exit__(type, value, tb) def append(self, line): """ Write the header if necessary, then append the line. :param line: line to be written :type line: str """ if not self.wrote_header: # header should be defined by now header = self.header_function() assert header is not None # append header if necessary self.csv_writer.writerow(header) self.wrote_header = True self.csv_writer.writerow(line)
[docs]class StructureMergeHandler(BaseMergeHandler): """ Helper class to bundle structure.Structure IO operations. """
[docs] def __init__(self): self.smiles_generator = smiles_mod.SmilesGenerator()
[docs] def getProductReader(self, file): """ Create and return a structure reader :param file: structure file name :type file: str :return: structure reader for file :rtype: structure.StructureReader """ return structure.StructureReader(file)
[docs] def getProductAppender(self, file): """ Create and return a structure writer :param file: structure file name :type file: str :return: structure writer for file :rtype: structure.StructureWriter """ return structure.StructureWriter(file)
[docs] def getCompareKey(self, prod): """ Compute smiles from a given Schrodinger structure to compare against other structures. :param prod: product in question :type prod: structure.Structure :return: SMILES string :rtype: str """ return self.smiles_generator.getSmiles(prod)
[docs]class RdkitMergeHandler(BaseMergeHandler):
[docs] def __init__(self, v3000=False): """ :param bool v3000: If using an RDKit writer and writing an SD file, force V3000 format. """ self.v3000 = v3000
[docs] def getProductReader(self, file): return Chem.SDMolSupplier(file)
[docs] def getProductAppender(self, file): return RdkitMolWriter(file, v3000=self.v3000)
[docs] def getCompareKey(self, prod): """ :param prod: product in question :type prod: rdkit.Chem.Mol :return: SMILES string :rtype: str """ return Chem.MolToSmiles(prod)
[docs]class SmiMergeHandler(BaseMergeHandler): """ Helper class to bundle SMILES (.smi) IO operations. """
[docs] def getProductReader(self, file): """ Create and return a SMILES line reader :param file: SMILES file name :type file: str :return: SMILES line reader for file :rtype: file-like object (__enter__, __exit__, __iter__) """ return open(file, 'r')
[docs] def getProductAppender(self, file): """ Create and return a SMILES line writer :param file: SMILES file name :type file: str :return: SMILES line writer for file :rtype: _SmilesAppender """ return SmiMergeHandler._SmilesAppender(file)
[docs] def getCompareKey(self, prod): """ Compute smiles from a given SMILES line for comparison to other SMILES lines. :param prod: product in question :type prod: str :return: SMILES string :rtype: str """ return prod.split()[0]
class _SmilesAppender: """ Wrapper class to redirect append() calls to the standard file write() call. """ def __init__(self, file): self.handle = open(file, 'w') def append(self, product): self.handle.write(product) def __enter__(self): return self def __exit__(self, type, value, tb): self.handle.__exit__(type, value, tb)
[docs]def get_fieldnames(filenames): """ Return a list with the union of the field names from all the given CSV files. The field names are listed in the order in which they were first seen. (First all the fields from file #1, then the "new" field names from file #2, etc.) :param filenames: list of CSV files :type filenames: [str] :return: list of field names :rtype: [str] """ fieldnames = {} for fname in filenames: with open_maybe_compressed(fname, 'rt') as fin: reader = csv.reader(fin) try: row = next(reader) except StopIteration: row = [] fieldnames.update({name: None for name in row}) return list(fieldnames.keys())
[docs]def is_csvgz(filename): lcfname = filename.lower() return (lcfname.endswith('.csv.gz') or lcfname.endswith('.csvgz'))
[docs]def is_pfx(filename): return filename.lower().endswith(PFX)
[docs]def get_pfx_size(filename): """ Return the size from the metadata header of a .pfx file. """ with zipfile.ZipFile(filename) as zipfh: jsonstr = zipfh.read(METADATA) metadata = json.loads(jsonstr) return metadata['size']
[docs]def extract_structures(filename, dest_file): """ Extract structures from .pfx file into a given file. """ with zipfile.ZipFile(filename) as zipfh: with open_maybe_compressed(dest_file, 'wb') as fh: fh.write(zipfh.read(STRUCTURES))
[docs]def remove_react_atom_props(mol): """ Return a copy of `mol` where atom properties added by the RDKit reaction module have been stripped out. :param mol: input molecule; not modified :type mol: rdkit.Chem.Mol :return: modified molecule :rtype: rdkit.Chem.Mol """ new_mol = copy.copy(mol) react_props = ['react_atom_idx', 'old_mapno'] for atom in new_mol.GetAtoms(): for prop in react_props: atom.ClearProp(prop) return new_mol
[docs]def cat_csv_files(source_filenames, dest_filename): """ Quick and dirty csv concatenation strategy. Assumes all csv files have the same columns and does not deduplicate. :param source_filenames: input files :param dest_filename: destination file """ with open(dest_filename, 'wb') as fho: header = None for fname in source_filenames: with open_maybe_compressed(fname, 'rb') as fh: # consume first line, assumed to be header file_header = next(fh) if header is None: header = file_header fho.write(header) if file_header != header: raise ValueError( f"Inconsistent header for {fname}: {header} != {file_header}" ) shutil.copyfileobj(fh, fho)
[docs]def copy_csv_file(input_file, output_file): """ Copy compressed or uncompressed input .csv file to another .csv file. Output file can also be compressed or uncompressed. :param input_file: input file name :type input_file: str :param output_file: output file name :type output_file: str """ with open_maybe_compressed(input_file, 'rb') as f_in: with open_maybe_compressed(output_file, 'wb') as f_out: shutil.copyfileobj(f_in, f_out)