Source code for schrodinger.utils.multifpfile

'''
SQLite3 DB to hold several kinds of fingerprints associated
with the same structures.
'''

import json
import os
import re
import sqlite3
import zlib

from schrodinger.infra import canvas
from schrodinger.infra import phase
from schrodinger.utils import fileutils

#------------------------------------------------------------------------------#

FINGERPRINTS_TABLE = 'Fingerprints'

# yapf: disable
FP_TYPE_TO_NAME = {
    phase.FP_TYPE_DENDRITIC: 'dendritic',
    phase.FP_TYPE_FDENDRITIC: 'fdendritic',
    phase.FP_TYPE_LINEAR: 'linear',
    phase.FP_TYPE_MACCS: 'maccs',
    phase.FP_TYPE_MOLPRINT2D: 'molprint2D',
    phase.FP_TYPE_RADIAL: 'radial',
    phase.FP_TYPE_ECFP4: 'ecfp4',
    phase.FP_TYPE_ECFP6: 'ecfp6',
    phase.FP_TYPE_FCFP4: 'fcfp4',
    phase.FP_TYPE_FCFP6: 'fcfp6'
}
# yapf: enable

FP_NAME_TO_TYPE = {v: k for k, v in FP_TYPE_TO_NAME.items()}

#------------------------------------------------------------------------------#


[docs]def make_fp_generator(fp_type): ''' Repeats code from `createFpGenerators()` (see phase_database.cpp). :param fp_type: Fingerprint type. :type fp_type: `phase.PhpFpType` or `str` :return: Fingerprint generator. :rtype: `canvas.ChmFPOut32` ''' try: type_id = int(fp_type) except ValueError: type_id = FP_NAME_TO_TYPE[fp_type] if type_id == phase.FP_TYPE_DENDRITIC: return canvas.ChmDendriticOut32() elif type_id == phase.FP_TYPE_FDENDRITIC: return canvas.ChmDendriticOut32(4) elif type_id == phase.FP_TYPE_LINEAR: return canvas.ChmLinearOut32() elif type_id == phase.FP_TYPE_MACCS: return canvas.ChmMaccsOut32() elif type_id == phase.FP_TYPE_MOLPRINT2D: return canvas.ChmMolprint2D32() elif type_id == phase.FP_TYPE_RADIAL: return canvas.ChmRadialOut32() elif type_id == phase.FP_TYPE_ECFP4: g = canvas.ChmRadialOut32(10) g.setIterations(2) return g elif type_id == phase.FP_TYPE_ECFP6: g = canvas.ChmRadialOut32(10) g.setIterations(3) return g elif type_id == phase.FP_TYPE_FCFP4: g = canvas.ChmRadialOut32() g.setIterations(2) return g elif type_id == phase.FP_TYPE_FCFP6: g = canvas.ChmRadialOut32() g.setIterations(3) return g else: raise KeyError('unsupported fingerprint type %d' % fp_type)
#------------------------------------------------------------------------------#
[docs]def bitset_to_list(bitset): ''' :param bitset: Canvas bitset. :type bitset: like `schrodinger.infra.canvas.ChmSparseBitset32` ''' return [bitset[i] for i in range(0, bitset.count())]
#------------------------------------------------------------------------------# def _serialize_bits(bits): ''' :param bits: Canvas bitset. :type bits: like `schrodinger.infra.canvas.ChmSparseBitset32` ''' return zlib.compress(json.dumps(bitset_to_list(bits)).encode('ascii')) #------------------------------------------------------------------------------# def _deserialize_bits(buf): return json.loads(zlib.decompress(buf)) #------------------------------------------------------------------------------# def _make_fingerprints_schema(typenames): ''' Returns SQL statement to be used to create the `Fingeprints` table. :param typenames: Set of fingerprint type names. :type typenames: `set(str)` :return: SQL code. :rtype: `str` ''' columns = ['mol_id INTEGER PRIMARY KEY' ] + [t + ' BLOB' for t in sorted(list(typenames))] return 'CREATE TABLE {name} ({columns})'.format(name=FINGERPRINTS_TABLE, columns=', '.join(columns)) #------------------------------------------------------------------------------# def _parse_fingerprints_schema(sql): ''' Parses 'CREATE TABLE ...' statement as returned by `_get_fingerprints_schema`. Returns list of fingerprint types (that may be empty), or `None` in case the schema is deemed invalid. :param sql: SQL statement. :type sql: `str` :return: List of fingerprint types. :rtype: `list(str)` ''' m = re.match(r'^\s*CREATE\s+TABLE\s+\w+\s*\(([^()]+)\)\s*$', sql, flags=re.IGNORECASE) if m is None: return None cols = m.group(1).split(',') if not re.match(r'^\s*mol_id\s+INTEGER\s+PRIMARY\s+KEY\s*$', cols[0], flags=re.IGNORECASE): return None typenames = [] for c in cols[1:]: m = re.match(r'^\s*(\w+)\s+BLOB\s*$', c, flags=re.IGNORECASE) if not m: return None else: typenames.append(str(m.group(1).lower())) return typenames #------------------------------------------------------------------------------#
[docs]def get_fingerprint_types(filename): ''' Determines whether `filename` is a valid multi-fp file, and if it is, returns stored fingerprint types. :param filename: File name. :type filename: `str` :return: List of available fingerprint types or `None`. :rtype: `list(str)` or `None` ''' if not os.path.isfile(filename): # otherwise empty SQLite3 DB would be created return None try: with sqlite3.connect(filename) as con: # TODO: read-only cur = con.execute("SELECT sql FROM sqlite_master WHERE " "type='table' AND name='%s'" % FINGERPRINTS_TABLE) row = cur.fetchone() if row is None: return None return _parse_fingerprints_schema(row[0]) except sqlite3.DatabaseError: return None
#------------------------------------------------------------------------------#
[docs]class MultiFPFileWriter:
[docs] def __init__(self, filename, fpnames): ''' :param fpnames: Fingerprint type names. :type fpnames: `list(str)` ''' self.fpnames = sorted({s for s in fpnames}) self.fpgenerators = \ [make_fp_generator(FP_NAME_TO_TYPE[s]) for s in self.fpnames] nvalues = 1 + len(self.fpnames) self.values_sql = 'VALUES(' + ','.join(['?'] * nvalues) + ')' fileutils.force_remove(filename) self._conn = sqlite3.connect(filename) self._conn.text_factory = str with self._conn: self._conn.execute(_make_fingerprints_schema(self.fpnames))
def __enter__(self): return self def __exit__(self, *a): self.close()
[docs] def close(self): self._conn.close()
[docs] def append(self, chmol, molid=None): ''' Computes and stores fingerprints for Canvas molecule. :param chmol: Canvas molecule. :type chmol: `schrodinger.canvas.ChmMol` :param molid: Molecule ID to use (must be unique). If not provided, next available value will be used. :type molid: `int` :return: Molecule ID for the just added set of fingerprints. :rtype: `int` ''' packed = [molid] for g in self.fpgenerators: fp = g.generate(chmol) packed.append(sqlite3.Binary(_serialize_bits(fp))) # this may throw sqlite3.OperationalError for locked DB with self._conn: cur = self._conn.execute( 'INSERT INTO ' + FINGERPRINTS_TABLE + ' ' + self.values_sql, packed) return cur.lastrowid
#------------------------------------------------------------------------------#
[docs]class MultiFPFile: ''' Multi-fingerprint file (read-only, random access). '''
[docs] def __init__(self, filename): ''' :param filename: File name. :type filename: `str` ''' self.typenames = get_fingerprint_types(filename) if self.typenames is None: raise ValueError("'%s' is not a multi-fingerprint file" % filename) self._conn = sqlite3.connect(filename) # TODO: read-only self._conn.text_factory = str
def __enter__(self): return self def __exit__(self, *a): self.close()
[docs] def close(self): self._conn.close()
[docs] def get_typenames(self): return self.typenames
[docs] def get_mol_ids(self): ''' :return: List of molecule IDs. :rtype: `list(int)` ''' cur = self._conn.execute('SELECT mol_id FROM ' + FINGERPRINTS_TABLE) return [int(row[0]) for row in cur]
[docs] def get_fingerprints(self, mol_id): ''' :return: List of lists of the "on" bits for all types of fingerprints stored in the associated DB (ordered as the names returned by `get_typenames()`). :rtype: `list(list(int))` ''' cur = self._conn.execute( 'SELECT ' + ', '.join(self.typenames) + ' FROM ' + FINGERPRINTS_TABLE + ' WHERE mol_id=?', (mol_id,)) row = cur.fetchone() if row is None: return [] else: return [_deserialize_bits(b) for b in row]
[docs] def iter_fingerprints(self, typenames=None, molids=None): ''' Generator to iterate over the fingerprints of the desired types. :param typenames: List of desired fingerprint types. :type typenames: list(str) :param molids: IDs of desired entries (or `None` for all). :type molids: containment checkable for int ''' if typenames is None: typenames = self.typenames cur = self._conn.execute('SELECT ' + ', '.join(['mol_id'] + typenames) + ' FROM ' + FINGERPRINTS_TABLE + ' ORDER BY mol_id') for row in cur: if molids and row[0] not in molids: continue yield tuple([row[0]] + [_deserialize_bits(b) for b in row[1:]])
#------------------------------------------------------------------------------#