Source code for schrodinger.application.phase.packages.conformer_storage

'''
Support for serialization of "multiconformer structures".
'''

import re
import struct
import zlib
from enum import Enum
from functools import partial

import numpy

from schrodinger import structure
from schrodinger.infra import phase


[docs]class Format(Enum): COMPACT = 'compact' LOSSLESS = 'lossless'
_encoder = struct.Struct('f') # "f" => "float", 4 bytes #------------------------------------------------------------------------------# def _str_to_bytes(s): return s.encode('latin-1') def _bytes_to_str(b): return b.decode('latin-1') #------------------------------------------------------------------------------# def _copy_connectivity(st): ''' Returns new CT that has the same title, connectivity and Lewis structure as `st`. :param st: Structure. :type st: `structure.Structure` :return: New structure. :rtype: `structure.Structure` ''' natom = st.atom_total new = structure.create_new_structure(natom) new.title = st.title for (src, dst) in zip(st.atom, new.atom): dst.atom_type = src.atom_type dst.atomic_number = src.atomic_number dst.formal_charge = src.formal_charge dst.color = src.color dst.xyz = 0.0, 0.0, 0.0 bonds = [(b.atom1.index, b.atom2.index, b.order) for b in st.bond] new.addBonds(bonds) return new #------------------------------------------------------------------------------# def _conformers_to_bytes(cts, keep_properties=False): ''' Assumes that CTs are conformers (have same connectivity and Lewis structure). :param cts: Conformers to serialize. :type cts: container of `structure.Structure` :param keep_properties: Keep properties that would be discarded otherwise. :type keep_properties: bool :return: Bytes. :rtype: bytearray ''' lead_ct = cts[0] if keep_properties else _copy_connectivity(cts[0]) lead_ct_str = structure.write_ct_to_string(lead_ct) lead_ct_str = re.sub(r'0\.0+\s', '0 ', lead_ct_str) lead_ct_bytes = lead_ct_str.encode() num_confs = len(cts) num_atoms = lead_ct.atom_total num_float32 = 1 + 1 + 3 * num_confs * num_atoms num_bytes = _encoder.size * num_float32 + len(lead_ct_bytes) outcome = bytearray(num_bytes) offset = 0 _encoder.pack_into(outcome, offset, num_atoms) offset += _encoder.size _encoder.pack_into(outcome, offset, num_confs) offset += _encoder.size for ct in cts: if ct.atom_total != num_atoms: raise RuntimeError('different number of atoms in a conformer') for x in numpy.ravel(ct.getXYZ(copy=False)): _encoder.pack_into(outcome, offset, x) offset += _encoder.size outcome[offset:] = lead_ct_bytes return outcome #------------------------------------------------------------------------------# def _bytes_to_conformers(data): ''' Deserialize conformers serialized by `_conformers_to_bytes`. :param data: Bytes. :type data: bytearray :return: List of the conformers. :rtype: list(structure.Structure) ''' assert len(data) >= 2 * _encoder.size offset = 0 num_atoms = int(_encoder.unpack_from(data, offset)[0]) offset += _encoder.size num_confs = int(_encoder.unpack_from(data, offset)[0]) offset += _encoder.size lead_ct_offset = offset + 3 * _encoder.size * num_atoms * num_confs lead_ct_str = data[lead_ct_offset:].decode() lead_ct = next(structure.StructureReader.fromString(lead_ct_str)) outcome = [] for c in range(num_confs): st = lead_ct.copy() for atom in st.atom: atom.x = _encoder.unpack_from(data, offset)[0] offset += _encoder.size atom.y = _encoder.unpack_from(data, offset)[0] offset += _encoder.size atom.z = _encoder.unpack_from(data, offset)[0] offset += _encoder.size outcome.append(st) return outcome #------------------------------------------------------------------------------#
[docs]def serialize_lossless(conformers, keep_properties=False, deflate=True): ''' Serializes conformers to be deserialized by `deserialize_lossless()`. :param conformers: List of conformer structures. :type conformers: list(structure.Structure) :param keep_properties: Keep properties that would be discarded otherwise. :type keep_properties: bool :param deflate: Deflate using zlib? :type deflate: bool :return: Serialized conformers. :rtype: str ''' raw = _conformers_to_bytes(conformers, keep_properties=keep_properties) zipped = \ zlib.compress(raw, level=zlib.Z_BEST_COMPRESSION) if deflate else raw return _bytes_to_str(zipped)
#------------------------------------------------------------------------------#
[docs]def deserialize_lossless(data, inflate=True): ''' Deserializes conformers serialized by `serialize_lossless()`. :param data: Serialized conformers. :type data: str :param inflate: Decompress using zlib? :type inflate: bool :return: Conformer structures. :rtype: list(structure.Structure) ''' encoded = _str_to_bytes(data) unzipped = zlib.decompress(encoded) if inflate else encoded return _bytes_to_conformers(unzipped)
#------------------------------------------------------------------------------#
[docs]def serialize_compact(conformers, keep_properties=False, deflate=True): ''' Serializes conformers using approach from PHASE-2096. :param conformers: List of conformer structures. :type conformers: list(structure.Structure) :param keep_properties: Keep properties that would be discarded otherwise. :type keep_properties: bool :param deflate: Deflate using zlib? :type deflate: bool :return: Serialized conformers. :rtype: str ''' if keep_properties: to_be_deflated = conformers else: to_be_deflated = list(map(_copy_connectivity, conformers)) for (src, dst) in zip(conformers, to_be_deflated): dst.setXYZ(src.getXYZ(copy=False)) deflator = phase.PhpConformerDeflator() blob = deflator.deflate(to_be_deflated) data = blob.getData() if not deflate: # blob is zlib-compressed data = zlib.decompress(data) return _bytes_to_str(data)
#------------------------------------------------------------------------------#
[docs]def deserialize_compact(data, inflate=True): ''' Deserializes conformers serialized by `serialize_compact`. :param data: Serialized conformers. :type data: str :param inflate: Decompress using zlib? :type inflate: bool :return: Conformer structures. :rtype: list(structure.Structure) ''' encoded = _str_to_bytes(data) unzipped = zlib.decompress(encoded) if inflate else encoded inflator = phase.PhpConformerInflator(unzipped, features=phase.PhpFeatures_OMIT) return inflator.getStructureConformers()
#------------------------------------------------------------------------------#
[docs]def get_api(fmt, keep_properties=False, compress=True): ''' Returns conformer serializer/deserializer for format `fmt`. :param fmt: Desired data format. :type fmt: `Format` :param keep_properties: Keep properties that would be discarded otherwise. :type keep_properties: bool :param compress: Apply zlib compression. :type compress: bool :return: Couple of callables to serialize/deserialize conformers. :rtype: (iterable(structure.Structure) -> str, str -> list(structure.Structure)) ''' API = { Format.COMPACT: (serialize_compact, deserialize_compact), Format.LOSSLESS: (serialize_lossless, deserialize_lossless) } try: serialize, deserialize = API[fmt] return (partial(serialize, keep_properties=keep_properties, deflate=compress), partial(deserialize, inflate=compress)) except KeyError: raise ValueError(f'invalid conformer storage format {fmt}')
#------------------------------------------------------------------------------#