Source code for schrodinger.application.glide.ligand_designer

"""
This module provides the APIs behind the Ligand Designer panel and workflow.
It combines binding site Phase hypothesis generation with R-group enumeration
and Glide grid generation and docking. The docking uses a Glide HTTP server for
speed.
"""

import glob
import hashlib
import json
import os
from pathlib import Path

import schrodinger
from schrodinger import structure
from schrodinger.application.glide import glide
from schrodinger.application.glide import http_client
from schrodinger.application.glide import utils as glide_utils
from schrodinger.models import parameters
from schrodinger.Qt import QtCore
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils import mmutil

# Maximum time in seconds to wait for Glide grid generation to finish
GRIDGEN_WAIT = 600
# Default number of workers
DEFAULT_NUM_WORKERS = 2

logger = log.get_output_logger(__file__)

if schrodinger.in_dev_env():
    logger.setLevel(log.DEBUG)

REFLIG_NAME = 'reflig.maegz'
GRIDFILE_NAME = 'grid.grd'
JOBDIR_SEP = '-'


[docs]def num_glide_workers(): """ Determines number of workers from environment variable. If number of workers is not an integer or <= 0 we use default value of 2. :return: number of workers :rtype: int """ try: nworkers = int(os.environ.get('LIGAND_DESIGNER_PROCESSORS', '2')) if nworkers <= 0: nworkers = DEFAULT_NUM_WORKERS except ValueError: nworkers = DEFAULT_NUM_WORKERS logger.debug(f'Using {nworkers} Glide server workers') return nworkers
[docs]class GridgenRunningException(RuntimeError):
[docs] def __init__(self, message="Grid generation is still running"): super().__init__(message)
[docs]class BuildServerTask(tasks.BlockingFunctionTask): """ Task to set up and start a glide server. The server performs core-constrained Glide docking. The server state and intermediate files are in a scratch directory. A unique subdirectory is created for each ligand-receptor complex; if another object is created for the same complex, it will share the same directory. This allows the reuse of existing grid files, for example. However, only one object at a time can be performing an enumeration because the underlying Glide server process is single-threaded. :ivar gridJobStarted: Signal when grid job has launched. Emitted with task """ gridJobStarted = QtCore.pyqtSignal(jobtasks.CmdJobTask) gg_task = parameters.NonParamAttribute()
[docs] class Input(parameters.CompoundParam): ligand_st: structure.Structure = None receptor_st: structure.Structure = None docking_keywords: dict start_gridgen: bool
[docs] class Output(parameters.CompoundParam): server: http_client.NonBlockingGlideServerManager = None
[docs] def initConcrete(self, tmpdir=None, *args, **kwargs): """ :param tmpdir: Base temporary directory for server directories """ super().initConcrete(*args, **kwargs) self._tmpdir = tmpdir self.gg_task = None
@tasks.preprocessor(order=tasks.BEFORE_TASKDIR) def _initGridgenTask(self): if not self.input.start_gridgen or self.gg_task is not None: return self.gg_task = GridgenTask() self.gg_task.input.ligand_st = self.input.ligand_st self.gg_task.input.receptor_st = self.input.receptor_st @tasks.preprocessor(order=tasks.BEFORE_TASKDIR + 1) def _findTaskDir(self): """ Check existing task dirs to see if any have a grid compatible with the current ligand_st and receptor_st """ if self.taskDirSetting() is not self.DEFAULT_TASKDIR_SETTING: # Skip if we have already set a taskdir return recepname = fileutils.get_jobname(self.input.receptor_st.title) signature = get_structure_digest(self.input.receptor_st) jobname = '-'.join(['ld', recepname, signature]) tmpdir = self._tmpdir or fileutils.get_directory_path(fileutils.TEMP) jobdir_stem = Path(tmpdir) / 'ligand_designer' / jobname jobdir_stem = jobdir_stem.absolute() jobdir = self._checkExistingJobDirs(jobdir_stem) if not jobdir: jobdir = fileutils.get_next_filename(str(jobdir_stem), JOBDIR_SEP) jobdir = Path(jobdir) logger.debug('Jobdir: %s', jobdir) self.specifyTaskDir(jobdir) self.gg_task.specifyTaskDir(jobdir) def _checkExistingJobDirs(self, jobdir_stem): dir_pattern = f"{jobdir_stem}{JOBDIR_SEP}*" for jobdir in glob.iglob(dir_pattern): self.gg_task.specifyTaskDir(jobdir) if self.gg_task.checkGridfile() or self.gg_task.checkReflig(): return jobdir
[docs] def mainFunction(self): self.output.reset() gg_task = self.gg_task grid_ok = gg_task.checkGridfile() if self.input.start_gridgen: if not grid_ok: gg_task.start() self.gridJobStarted.emit(gg_task) else: gg_task.updateReflig() elif grid_ok: self._startServer() else: # Fail if the gridfile isn't acceptable but we aren't supposed to # run gridgen if gg_task.status is gg_task.RUNNING: exc = GridgenRunningException() else: exc = RuntimeError("Gridgen failed") self._recordFailure(exc)
def _startServer(self): gg_task = self.gg_task docking_keywords = { 'PRECISION': 'HTVS', 'GRIDFILE': gg_task.getTaskFilename(GRIDFILE_NAME), 'REF_LIGAND_FILE': gg_task.getTaskFilename(REFLIG_NAME), 'CORE_DEFINITION': 'mcssmarts', 'CORE_RESTRAIN': True, 'CORECONS_FALLBACK': True, 'WRITE_RES_INTERACTION': True, 'GEOCHECK_FACTOR': 0.9, } docking_keywords.update(self.input.docking_keywords) kwargs = dict( jobdir=self.getTaskDir(), use_jc=False, ) if kwargs['use_jc']: # Only set jobname for jobcontrol to allow reattaching kwargs['jobname'] = 'glide_server' if mmutil.feature_flag_is_enabled(mmutil.FAST_LIGAND_DESIGNER): ServerCls = http_client.NonBlockingGlideServerManagerZmq kwargs['nworkers'] = num_glide_workers() else: ServerCls = http_client.NonBlockingGlideServerManager kwargs['timeout'] = 1200 # 20 minutes server = ServerCls(docking_keywords, **kwargs) ready = server.isReady() # Check whether server is already running if not ready: try: server.start() except Exception as exc: self._recordFailure(exc) return else: ready = True if ready: self.output.server = server
[docs]class GridgenTask(jobtasks.CmdJobTask): """ Task to run glide grid generation. :cvar RUNNING_MESSAGE: Message for RUNNING status :cvar FAILED_MESSAGE: Message for FAILED status """ RUNNING_MESSAGE = 'Generating grid...' FAILED_MESSAGE = 'Grid generation failed' infile = parameters.NonParamAttribute()
[docs] class Input(parameters.CompoundParam): ligand_st: structure.Structure = None receptor_st: structure.Structure = None
[docs] class Output(jobtasks.CmdJobTask.Output): gridfile: str = None
[docs] def initConcrete(self): super().initConcrete() self.name = os.path.splitext(GRIDFILE_NAME)[0] self.infile = self.name + ".in"
@tasks.preprocessor(order=tasks.AFTER_TASKDIR) def _writeInputs(self): logger.debug("Writing gridgen input files") self._writeReflig() pvfile = self.name + "_in.maegz" with structure.StructureWriter(self.getTaskFilename(pvfile)) as writer: writer.append(self.input.receptor_st) writer.append(self.input.ligand_st) keywords = { 'RECEP_FILE': pvfile, 'LIGAND_INDEX': 2, 'GRIDFILE': GRIDFILE_NAME, } glide_job = glide.get_glide_job(keywords) infile = self.getTaskFilename(self.infile) glide_job.writeSimplified(infile)
[docs] def makeCmd(self): return ['$SCHRODINGER/glide', self.infile]
@tasks.postprocessor def _checkOutput(self): gridfile = self.getTaskFilename(GRIDFILE_NAME) if not os.path.isfile(gridfile): return False, "Gridfile not found" self.output.gridfile = gridfile
[docs] def checkGridfile(self) -> bool: """ Return whether the specified taskdir contains a gridfile compatible with the input ligand. """ taskdir = self.taskDirSetting() if not isinstance(taskdir, (str, Path)): raise ValueError("Can only be used with a specified taskdir") elif not os.path.exists(taskdir): raise ValueError("Can only be used with an existing taskdir") self._createTaskDir() # will be a no-op but allows calling getTaskDir return self._checkGridfile()
def _checkGridfile(self) -> bool: """ Check whether the taskdir contains a gridfile compatible with the input ligand. If so, writes the ref ligand file if it's missing. """ grid_file = self.getTaskFilename(GRIDFILE_NAME) if (glide_utils.check_required_gridfiles(grid_file) and glide_utils.is_grid_good_for_ligand(grid_file, self.input.ligand_st)): self._writeReflig(overwrite=False) return True return False
[docs] def checkReflig(self) -> bool: """ Whether the ref ligand file is equivalent to the input ligand """ ref_lig_file = self.getTaskFilename(REFLIG_NAME) if os.path.isfile(ref_lig_file): ref_lig_st = structure.Structure.read(ref_lig_file) if ref_lig_st.isEquivalent(self.input.ligand_st): return True return False
def _writeReflig(self, overwrite: bool = True): ref_lig_file = self.getTaskFilename(REFLIG_NAME) if overwrite or not os.path.isfile(ref_lig_file): self.input.ligand_st.write(ref_lig_file)
[docs] def updateReflig(self): """ Overwrite the ref ligand if it isn't equivalent to the input ligand """ if not self.checkReflig(): self._writeReflig()
[docs]def read_json_file(filename): """ Read a JSON file. If there are issues reading it (doesn't exist, syntax errors...) quietly return an empty dict. :type filename: str :rtype: object """ try: with open(filename) as fh: return json.load(fh) except (IOError, ValueError): return {}
[docs]def md5sum(input_str): """ MD5 hex digest of a string. :type input_str: str :rtype: str """ m = hashlib.md5() m.update(input_str.encode('utf-8')) return m.hexdigest()
[docs]def get_structure_digest(st, length=8): """ Return an abbreviated MD5 hex digest given a Structure, considering only element, formal charge, and XYZ coordinates. :param st: input structure (not modified) :type st: schrodinger.structure.Structure :param length: digest length in hex digits :type length: int :return: hex digest :rtype: str """ receptor_str = '\n'.join( '{} {:d} {:.3} {:.3} {:.3}'.format(a.element, a.formal_charge, *a.xyz) for a in st.atom) return md5sum(receptor_str)[:length]