Source code for schrodinger.pipeline.pipeutils

"""
Shared functions for Pipeline stages.

Copyright Schrodinger, LLC. All rights reserved.

"""
# Contributors: Matvey Adzhigirey

import os
import sys
from past.utils import old_div

from schrodinger import structure
from schrodinger.utils import fileutils


[docs]def countRoots(ligfiles, unique_field="s_m_title"): """ Counts the nunber of compounds in the supplied files. Compounds are identified by the 'unique_field' property, and all structures that share the same 'unique_field' value are considered variants of the compound. Raises a RuntimeError if there is a problem reading a ligand file or if the 'unique_field' property is missing. Returns a tuple of the total number of structures (i.e., variants) and the total number of compounds. """ unique_field_is_title = (unique_field == "title" or unique_field == "s_m_title") ligand_roots_dict = {} st_num = 0 for ligfile in ligfiles: try: sts = structure.StructureReader(ligfile) except: raise RuntimeError("Could not read file:" + ligfile) for st in sts: st_num += 1 if int(old_div(st_num, 1000)) * 1000 == st_num: sys.stdout.write(".") sys.stdout.flush() if unique_field_is_title: try: root = st.title except: raise RuntimeError("A ligand in file " + ligfile + " is missing a title!") else: try: root = st.property[unique_field] except: raise RuntimeError("No field " + unique_field + " in file " + ligfile + " ligand " + str(st.title) + "!") try: ligand_roots_dict[root] += 1 except KeyError: ligand_roots_dict[root] = 1 return (st_num, len(ligand_roots_dict))
[docs]class BackwardsReader(object): """ Read a file line by line, backwards. Takes in a file path, returns an iterator class. """ BLKSIZE = 4096
[docs] def __init__(self, filename): self.fh = open(filename, "r") # Ev:134557 self.buf = "" self.fh.seek(0, os.SEEK_END) self.file_size = self.fh.tell()
def __iter__(self): offset_from_start = self.file_size delta = 0 line = None while offset_from_start > 0: delta = min(self.file_size, delta + self.BLKSIZE) self.fh.seek(self.file_size - delta) toread = min(offset_from_start, self.BLKSIZE) self.buf = self.fh.read(toread) offset_from_start -= self.BLKSIZE lines = self.buf.split('\n') if line is not None: if self.buf[-1] != '\n': lines[-1] += line else: yield line line = lines[0] for idx in range(len(lines) - 1, 0, -1): if len(lines[idx]) > 0: yield lines[idx] if line is not None: yield line def __del__(self): self.fh.close()
[docs]def get_last_20_lines(logfile): """ Given a log file, returns a string of last 20 lines of it. """ msg = "" msg += " Last 20 lines of %s:\n" % logfile msg += "******************************************************************************\n" if os.path.exists(logfile): lines = [] for line in BackwardsReader(logfile): lines.insert(0, line) if len(lines) == 20: break for line in lines: msg += line + '\n' else: msg += " NO LOG FILE\n" msg += "******************************************************************************\n" return msg
[docs]class DotPrinter: """ Class for printing a progress period or percentage every N number of iterations. Example: dp = DotPrinter(total_sts) for st in sr: dp.dot() """
[docs] def __init__(self, total_sts=None, every=1000): self._total_sts = total_sts self._every = every # Print dot every N structures self.prev_percent = 0 self._num_dots = 0 self._curr_st = 0
[docs] def dot(self): self._curr_st += 1 if self._curr_st % self._every == 0: sys.stdout.write(".") if self._total_sts: # If total number of structures is known self._num_dots += 1 new_percent = int(self._curr_st * 100 / self._total_sts) if (new_percent > self.prev_percent and self._num_dots >= 10) \ or (self._curr_st == self._total_sts): sys.stdout.write(str(new_percent) + "%") self.prev_percent = new_percent self._num_dots = 0 sys.stdout.flush() return self._curr_st
# Ev:96648 & Ev: 104439
[docs]def read_unique_field(st, uniquefield): """ Returns the value of the specified property for the specified st (converted to string). If the property does not exist, attempts to read the same property of different type (string/int/float). If neither is avaible, re-raises the missing property exception. """ orig_exception = None try: return str(st.property[uniquefield]) except Exception as err: orig_exception = err try: return str(st.property['s' + uniquefield[1:]]) except: pass try: return str(st.property['i' + uniquefield[1:]]) except: pass try: return str(st.property['r' + uniquefield[1:]]) except: pass raise orig_exception
[docs]def get_reader(filename, astext=False, sd_for_unknown=True, support_smi=True): """ Return a StructureReader object for reading the file; based on the file type. :param astext: Returns a MaestroTextReader instance if specified file is a Maestro file and astext is True. :type astext: bool :param sd_for_unknown: Whether to open files with unknown extensions as SD. :type sd_for_unkown: bool :param support_smi: Whether to support SMILES and SMILESCSV formats. :type support_smi: bool """ iformat = fileutils.get_structure_file_format(filename) if not iformat and sd_for_unknown: iformat = fileutils.SD if astext and iformat == fileutils.MAESTRO: return structure.MaestroTextReader(filename) # Will open unknown files as SD: if iformat == fileutils.SMILES and support_smi: sr = structure.SmilesReader(filename) elif iformat == fileutils.SMILESCSV and support_smi: sr = structure.SmilesCsvReader(filename) else: sr = structure.StructureReader(filename) return sr
# EOF