Source code for schrodinger.application.phase.packages.conformer_storage
'''
Support for serialization of "multiconformer structures".
'''
import re
import struct
import zlib
from enum import Enum
from functools import partial
import numpy
from schrodinger import structure
from schrodinger.infra import phase
_encoder = struct.Struct('f') # "f" => "float", 4 bytes
#------------------------------------------------------------------------------#
def _str_to_bytes(s):
return s.encode('latin-1')
def _bytes_to_str(b):
return b.decode('latin-1')
#------------------------------------------------------------------------------#
def _copy_connectivity(st):
'''
Returns new CT that has the same title, connectivity and
Lewis structure as `st`.
:param st: Structure.
:type st: `structure.Structure`
:return: New structure.
:rtype: `structure.Structure`
'''
natom = st.atom_total
new = structure.create_new_structure(natom)
new.title = st.title
for (src, dst) in zip(st.atom, new.atom):
dst.atom_type = src.atom_type
dst.atomic_number = src.atomic_number
dst.formal_charge = src.formal_charge
dst.color = src.color
dst.xyz = 0.0, 0.0, 0.0
bonds = [(b.atom1.index, b.atom2.index, b.order) for b in st.bond]
new.addBonds(bonds)
return new
#------------------------------------------------------------------------------#
def _conformers_to_bytes(cts, keep_properties=False):
'''
Assumes that CTs are conformers (have same connectivity
and Lewis structure).
:param cts: Conformers to serialize.
:type cts: container of `structure.Structure`
:param keep_properties: Keep properties that would be discarded otherwise.
:type keep_properties: bool
:return: Bytes.
:rtype: bytearray
'''
lead_ct = cts[0] if keep_properties else _copy_connectivity(cts[0])
lead_ct_str = structure.write_ct_to_string(lead_ct)
lead_ct_str = re.sub(r'0\.0+\s', '0 ', lead_ct_str)
lead_ct_bytes = lead_ct_str.encode()
num_confs = len(cts)
num_atoms = lead_ct.atom_total
num_float32 = 1 + 1 + 3 * num_confs * num_atoms
num_bytes = _encoder.size * num_float32 + len(lead_ct_bytes)
outcome = bytearray(num_bytes)
offset = 0
_encoder.pack_into(outcome, offset, num_atoms)
offset += _encoder.size
_encoder.pack_into(outcome, offset, num_confs)
offset += _encoder.size
for ct in cts:
if ct.atom_total != num_atoms:
raise RuntimeError('different number of atoms in a conformer')
for x in numpy.ravel(ct.getXYZ(copy=False)):
_encoder.pack_into(outcome, offset, x)
offset += _encoder.size
outcome[offset:] = lead_ct_bytes
return outcome
#------------------------------------------------------------------------------#
def _bytes_to_conformers(data):
'''
Deserialize conformers serialized by `_conformers_to_bytes`.
:param data: Bytes.
:type data: bytearray
:return: List of the conformers.
:rtype: list(structure.Structure)
'''
assert len(data) >= 2 * _encoder.size
offset = 0
num_atoms = int(_encoder.unpack_from(data, offset)[0])
offset += _encoder.size
num_confs = int(_encoder.unpack_from(data, offset)[0])
offset += _encoder.size
lead_ct_offset = offset + 3 * _encoder.size * num_atoms * num_confs
lead_ct_str = data[lead_ct_offset:].decode()
lead_ct = next(structure.StructureReader.fromString(lead_ct_str))
outcome = []
for c in range(num_confs):
st = lead_ct.copy()
for atom in st.atom:
atom.x = _encoder.unpack_from(data, offset)[0]
offset += _encoder.size
atom.y = _encoder.unpack_from(data, offset)[0]
offset += _encoder.size
atom.z = _encoder.unpack_from(data, offset)[0]
offset += _encoder.size
outcome.append(st)
return outcome
#------------------------------------------------------------------------------#
[docs]def serialize_lossless(conformers, keep_properties=False, deflate=True):
'''
Serializes conformers to be deserialized by `deserialize_lossless()`.
:param conformers: List of conformer structures.
:type conformers: list(structure.Structure)
:param keep_properties: Keep properties that would be discarded otherwise.
:type keep_properties: bool
:param deflate: Deflate using zlib?
:type deflate: bool
:return: Serialized conformers.
:rtype: str
'''
raw = _conformers_to_bytes(conformers, keep_properties=keep_properties)
zipped = \
zlib.compress(raw, level=zlib.Z_BEST_COMPRESSION) if deflate else raw
return _bytes_to_str(zipped)
#------------------------------------------------------------------------------#
[docs]def deserialize_lossless(data, inflate=True):
'''
Deserializes conformers serialized by `serialize_lossless()`.
:param data: Serialized conformers.
:type data: str
:param inflate: Decompress using zlib?
:type inflate: bool
:return: Conformer structures.
:rtype: list(structure.Structure)
'''
encoded = _str_to_bytes(data)
unzipped = zlib.decompress(encoded) if inflate else encoded
return _bytes_to_conformers(unzipped)
#------------------------------------------------------------------------------#
[docs]def serialize_compact(conformers, keep_properties=False, deflate=True):
'''
Serializes conformers using approach from PHASE-2096.
:param conformers: List of conformer structures.
:type conformers: list(structure.Structure)
:param keep_properties: Keep properties that would be discarded otherwise.
:type keep_properties: bool
:param deflate: Deflate using zlib?
:type deflate: bool
:return: Serialized conformers.
:rtype: str
'''
if keep_properties:
to_be_deflated = conformers
else:
to_be_deflated = list(map(_copy_connectivity, conformers))
for (src, dst) in zip(conformers, to_be_deflated):
dst.setXYZ(src.getXYZ(copy=False))
deflator = phase.PhpConformerDeflator()
blob = deflator.deflate(to_be_deflated)
data = blob.getData()
if not deflate:
# blob is zlib-compressed
data = zlib.decompress(data)
return _bytes_to_str(data)
#------------------------------------------------------------------------------#
[docs]def deserialize_compact(data, inflate=True):
'''
Deserializes conformers serialized by `serialize_compact`.
:param data: Serialized conformers.
:type data: str
:param inflate: Decompress using zlib?
:type inflate: bool
:return: Conformer structures.
:rtype: list(structure.Structure)
'''
encoded = _str_to_bytes(data)
unzipped = zlib.decompress(encoded) if inflate else encoded
inflator = phase.PhpConformerInflator(unzipped,
features=phase.PhpFeatures_OMIT)
return inflator.getStructureConformers()
#------------------------------------------------------------------------------#
[docs]def get_api(fmt, keep_properties=False, compress=True):
'''
Returns conformer serializer/deserializer for format `fmt`.
:param fmt: Desired data format.
:type fmt: `Format`
:param keep_properties: Keep properties that would be discarded otherwise.
:type keep_properties: bool
:param compress: Apply zlib compression.
:type compress: bool
:return: Couple of callables to serialize/deserialize conformers.
:rtype: (iterable(structure.Structure) -> str, str -> list(structure.Structure))
'''
API = {
Format.COMPACT: (serialize_compact, deserialize_compact),
Format.LOSSLESS: (serialize_lossless, deserialize_lossless)
}
try:
serialize, deserialize = API[fmt]
return (partial(serialize,
keep_properties=keep_properties,
deflate=compress), partial(deserialize,
inflate=compress))
except KeyError:
raise ValueError(f'invalid conformer storage format {fmt}')
#------------------------------------------------------------------------------#