Source code for schrodinger.structutils.block_data

"""
Access non-scalar structured data stored on a Structure.

Copyright Schrodinger, LLC. All rights reserved.
"""

import contextlib

from schrodinger.infra import mm


def _get_unrequested_handle(st):
    """
    Return the unrequested m2io data handle for the given structure; or None
    if only additional data is available.

    :param st: Input structure
    :type st: structure.Structure

    :return: m2io data handle (or None)
    :rtype: int or None
    """
    try:
        ur_handle = mm.mmct_ct_m2io_get_unrequested_handle(st.handle)
    except mm.MmException as err:
        # mmct will return MMCT_WARNING if unrequested handle is not present
        if err.rc == mm.MMCT_WARNING:
            return None
        else:
            raise  # Re-raise exceptions of type MMCT_ERROR
    else:
        return ur_handle


[docs]def get_blocks(st, truncate=False): """ Return a dict representation of data blocks in given a Structure object. A block is a dict, and subblocks are stored in a list which is a value within the parent block's dict. This includes data in the both the unrequested and additiona data handles. :param st: structure object or mmct handle :type st: structure.Structure or int :param truncate: whether to truncate multiple instances of the same blocks as a single dict containing only the first subblock, or return all repeated blocks as a list of dicts :type truncate: bool """ ur_handle = _get_unrequested_handle(st) if ur_handle is not None: unrequested_data = _get_blocks(ur_handle, truncate) else: unrequested_data = {} # Get an open handle if available, otherwise open a new one ad_handle = mm.mmct_ct_get_or_open_additional_data(st, True) additional_data = _get_blocks(ad_handle, truncate) # Return combined data, where additional can overwrite any unrequested return {**unrequested_data, **additional_data}
[docs]def write_blocks(st, data, truncate=False): """ NOTE: This is provided for working with legacy products only; please use structure properties instead of introducing new m2io data blocks. Inverse function to get_blocks(). Writes a dict representation of data blocks to a Structure object. Top-level blocks that already exist in the Structure object and are given in the input dict representation will be overwritten. If you wish to merge some blocks instead of overwriting them, try exporting existing data blocks and merging it with the desired blocks, and then pass the merged dictionary to write_blocks. Automatic merge is not provided as the operation is ambiguous and may be product-dependent. Note that read_blocks()- write_blocks()-read_blocks() cycle performed on a structure may return error if there are 'None' values in the data blocks. To circumvent the problem, after write_blocks() try writing structure to a string and reading it back. Afterwards you may safely perform read_blocks(). :param st: structure object or mmct handle :type st: structure.Structure or int :param data: data structure containing data blocks in the same format as returned by get_blocks() :type data: dict :param truncate: controls whether the data provided is in truncated representation, as described in read_blocks() :type truncate: boolean """ ad_handle = mm.mmct_ct_get_or_open_additional_data(st, True) _delete_existing_blocks(ad_handle, data) ur_handle = _get_unrequested_handle(st) if ur_handle is not None: _delete_existing_blocks(ur_handle, data) if truncate: _write_blocks(ad_handle, data) else: _write_blocks_as_lists_of_dicts(ad_handle, data)
[docs]def append_row_to_data(data, table_name, row_object): """ Helper function to present a table-like interface for interacting with blocks. Handles the matching of object properties to block columns, and raises an exception if the object doesn't contain all table columns. :param data: data object to be written to the maestro block. If no table with the specified name is present, it creates one. :type data: dict :param row_object: object corresponding to the new row to be appended :type row_object: dict :param table_name: name of the table to be appended. :type table_name: string """ if table_name not in data: data[table_name] = {} for column_name in row_object: data[table_name][column_name] = [] column_objects = data[table_name] if len(column_objects) != len(row_object): raise ValueError(f"Row mismatch: the row has {len(row_object)} columns" f"whereas the object has {len(column_objects)}.") for column_name in column_objects.keys(): if column_name not in row_object: raise ValueError(f"Row mismatch: row missing column {column_name}") column_value = row_object[column_name] column_objects[column_name].append(column_value)
def _delete_existing_blocks(handle, data): for key in data.keys(): if mm.m2io_inquire_block_name(handle, key): mm.m2io_delete_named_block(handle, key) def _get_blocks(handle, truncate): data = {} _get_props(handle, data) if truncate: _get_blocks_as_dicts(handle, data) else: _get_blocks_as_lists_of_dicts(handle, data) return data @contextlib.contextmanager def _m2io_goto_block(handle, block, iblock): mm.m2io_goto_block(handle, block, iblock) try: yield finally: mm.m2io_leave_block(handle) def _get_blocks_as_dicts(handle, data): for block in mm.m2io_get_block_names(handle, 1): with _m2io_goto_block(handle, block, 1): data[block] = _get_blocks(handle, truncate=True) def _write_blocks(handle, data): for key, block in data.items(): if isinstance(block, dict): mm.m2io_open_block(handle, key) _write_blocks(handle, block) mm.m2io_close_block(handle) else: if isinstance(block, list): mm.m2io_set_index_dimension(handle, len(block)) for inx in range(0, len(block)): _write_prop_idx(handle, key, block[inx], inx + 1) else: _write_prop(handle, key, block) def _write_blocks_as_lists_of_dicts(handle, data): for key, block in data.items(): if not isinstance(block, list): _write_prop(handle, key, block) elif not isinstance(block[0], dict): mm.m2io_set_index_dimension(handle, len(block)) for inx in range(0, len(block)): _write_prop_idx(handle, key, block[inx], inx + 1) else: for iblock in block: mm.m2io_open_block(handle, key) _write_blocks_as_lists_of_dicts(handle, iblock) mm.m2io_close_block(handle) def _get_blocks_as_lists_of_dicts(handle, data): for block in mm.m2io_get_block_names(handle, 1): block_count = mm.m2io_get_number_blocks(handle, block) block_list = [] for iblock in range(1, block_count + 1): with _m2io_goto_block(handle, block, iblock): block_list.append(_get_blocks(handle, truncate=False)) data[block] = block_list def _get_props(handle, data): try: dim = mm.m2io_get_index_dimension(handle) except mm.MmException: dim = 0 for key in mm.m2io_get_data_names(handle, -1): if dim > 0: data[key] = [] for i in range(1, dim + 1): data[key].append(_get_prop_idx(handle, key, i)) else: data[key] = _get_prop(handle, key) def _write_prop(handle, name, val): # return the value of a scalar property given an unrequested handle if val is None: mm.m2io_set_no_value(handle, [name]) elif name.startswith("r_"): _m2io_put(mm.m2io_put_real, handle, name, val) elif name.startswith("i_"): _m2io_put(mm.m2io_put_int, handle, name, val) elif name.startswith("b_"): _m2io_put(mm.m2io_put_boolean, handle, name, val) elif name.startswith("s_"): _m2io_put(mm.m2io_put_string, handle, name, val) else: raise ValueError("Invalid property type: '%s'" % name[0]) def _write_prop_idx(handle, name, val, index): # return the value of a scalar property given an unrequested handle if name.startswith("r_"): _m2io_put(mm.m2io_put_real_indexed, handle, name, val, index) elif name.startswith("i_"): _m2io_put(mm.m2io_put_int_indexed, handle, name, val, index) elif name.startswith("b_"): _m2io_put(mm.m2io_put_boolean_indexed, handle, name, val, index) elif name.startswith("s_"): _m2io_put(mm.m2io_put_string_indexed, handle, name, val, index) else: raise ValueError("Invalid property type: '%s'" % name[0]) def _get_prop(handle, name): # return the value of a scalar property given an unrequested handle if name.startswith("r_"): return _m2io_get(mm.m2io_get_real, handle, name) elif name.startswith("i_"): return _m2io_get(mm.m2io_get_int, handle, name) elif name.startswith("b_"): return _m2io_get(mm.m2io_get_boolean, handle, name) elif name.startswith("s_"): return _m2io_get(mm.m2io_get_string, handle, name) raise ValueError("Invalid property type: '%s'" % name) def _get_prop_idx(handle, name, index): # return the value of an array element property given an unrequested handle if name.startswith("r_"): return _m2io_get(mm.m2io_get_real_indexed, handle, name, index) elif name.startswith("i_"): return _m2io_get(mm.m2io_get_int_indexed, handle, name, index) elif name.startswith("b_"): return _m2io_get(mm.m2io_get_boolean_indexed, handle, name, index) elif name.startswith("s_"): return _m2io_get(mm.m2io_get_string_indexed, handle, name, index) raise ValueError("Invalid property type: '%s'" % name) def _m2io_put(m2io_put_func, handle, name, value, index=None): try: if index is None: m2io_put_func(handle, [name], [value]) else: m2io_put_func(handle, index, [name], [value]) except Exception as e: print(e) def _m2io_get(m2io_get_func, handle, name, index=None): try: if index is None: return m2io_get_func(handle, [name]).pop() else: return m2io_get_func(handle, index, [name]).pop() except mm.MmException as exc: if exc.rc == mm.M2IO_NO_VALUE: return None raise exc