import contextlib
import os
from schrodinger import shape
# Shape data treatment
COPY_BINARY = "copy"
REMOTE_BINARY = "remote"
# -----------------------------------------------------------------------------
def _raise_error(errno, message):
'''
Raises `RuntimeError` using the `message` and error string for `errno`.
:param errno: Error code.
:type errno: int
:param message: Error message prefix.
:type message: str
'''
reason = os.strerror(errno)
if reason:
message += f': {reason}'
raise RuntimeError(message)
# -----------------------------------------------------------------------------
def _close_reader_or_writer(instance, filename):
'''
Closes shape binary file reader/writer.
:param instance: Shape binary file reader or writer.
:type instance: `schrodinger.shape.ShapeBinaryFileReader` or
`schrodinger.shape.ShapeBinaryFileWriter`
:param filename: File name (for error message).
:type filename: str
:raises RuntimeError: If `instance.close()` fails.
'''
if not instance.close():
_raise_error(instance.getErrno(), f"could not close '{filename}'")
# -----------------------------------------------------------------------------
[docs]class ShapeFileWriter(contextlib.AbstractContextManager):
[docs] def __init__(self, filename, metadata, compress):
'''
:param filename: File name (expected to exist if metadata is empty).
:type filename: str
:param metadata: Text to be stored as file-scope "metadata". If empty,
"append" opening mode is assumed. If not empty, existing file
with the same name will be truncated.
:type metadata: str
:param compress: Compress the file using zlib.
:type compress: bool
'''
self._writer = shape.ShapeBinaryFileWriter(filename, metadata, compress)
self._filename = filename
self._check('could not ' + ('open' if metadata else 'create') +
f" '{filename}'")
def __exit__(self, exc_type, exc_value, traceback):
_close_reader_or_writer(self._writer, self._filename)
def _check(self, message=None):
'''
Raises `RuntimeError` if `self._writer` is not valid.
:param message: Error message prefix.
:type message: str
'''
if not self._writer.valid():
if message is None:
message = f"'{self._filename}': invalid writer"
_raise_error(self._writer.getErrno(), message)
[docs] def append(self, title, shapes):
'''
Appends `shapes` to the associated binary file.
:param title: Title of the shapes.
:type title: str
:param shapes: List of `shape.ShapeStructure` instances (may
have different number of spheres).
:type shapes: list(shape.ShapeStructure)
'''
self._writer.write(title, shapes)
self._check(f"'{self._filename}': write failed")
# -----------------------------------------------------------------------------
[docs]class ShapeFileReader(contextlib.AbstractContextManager):
[docs] def __init__(self, filename, block_size=10000):
'''
:param filename: File name.
:type filename: str
:param block_size: Number of (title, structures) entries to read
in one scoop.
:type block_size: int
'''
self._reader = shape.ShapeBinaryFileReader(filename)
self._filename = filename
self._block_size = block_size
if self._reader.closed():
_raise_error(self._reader.getErrno(),
f"could not open '{filename}'")
def __exit__(self, exc_type, exc_value, traceback):
_close_reader_or_writer(self._reader, self._filename)
@property
def metadata(self):
'''
File-scope metadata from the associated "shape binary" file.
'''
return self._reader.metadata
def __iter__(self):
'''
Yields blocks of named blocks of shape structures.
:return: Blocks of shapes and their titles. The `titles` is
a vector of (name, number-of-shape-structures) pairs, and
`shapes` is a vector of shape.ShapeStructure instances.
len(shapes) == titles[0][1] + titles[1][1] + ... + titles[N][1]
:rtype: (shape.ShapeStructureVector, shape.ShapeBlockTitleVector)
'''
shapes = shape.ShapeStructureVector()
titles = shape.ShapeBlockTitleVector()
while not self._reader.closed():
yield self.getBlock(shapes, titles)
[docs] def getBlock(self, shapes=None, titles=None):
'''
Reads next block of shapes and their titles. Returns (None, None)
on EOF, raises RuntimeError on errors. Uses memory associated
with `shapes` and/or `titles` unless the parameter is None.
:param shapes: SWIG-wrapped c++ std::vector<shape::ShapeStructure>.
:type shapes: shape.ShapeStructureVector
:param titles: SWIG-wrapped c++
std::vector<std::pair<std::string, size_t>>.
:type titles: shape.ShapeBlockTitleVector
:return: Blocks of shapes and their titles. The `titles` is
a vector of (name, number-of-shape-structures) pairs, and
`shapes` is a vector of shape.ShapeStructure instances.
len(shapes) == titles[0][1] + titles[1][1] + ... + titles[N][1]
:rtype: (shape.ShapeStructureVector, shape.ShapeBlockTitleVector)
'''
if self._reader.closed():
return (None, None)
if shapes is None:
shapes = shape.ShapeStructureVector()
if titles is None:
titles = shape.ShapeBlockTitleVector()
if not self._reader.readMultiple(self._block_size, titles, shapes):
errno = self._reader.getErrno()
self._reader.close()
if errno:
_raise_error(errno, f"{self._filename}: read error")
return (shapes, titles)
# -----------------------------------------------------------------------------
[docs]class SingularShapeFileReader(ShapeFileReader):
[docs] def __init__(self, filename):
super().__init__(filename)
def __iter__(self):
'''
Yields named blocks of shape structures.
:return: Title and shapes.
:rtype: (str, shape.ShapeStructureVector)
'''
shapes = shape.ShapeStructureVector()
ok, title = self._reader.read(shapes)
while ok:
yield (title, shapes)
ok, title = self._reader.read(shapes)
errno = self._reader.getErrno()
self._reader.close()
if errno:
_raise_error(errno, f"{self._filename}: read error")
# -----------------------------------------------------------------------------