Source code for schrodinger.application.glide.http_client

"""
Glide HTTP client

This module implements objects that connect to a Glide HTTP server (see
schrodinger.application.glide.http_server) and use the server to dock
ligands remotely.

Sample usage::

    from schrodinger import structure
    from schrodinger.application.glide import http_client

    client = http_client.HTTPClient(host='localhost', port=8000)
    ct = structure.Structure.read('mylig.mae')
    poses = client.dock(ct)
    for pose in poses:
        print("gscore=%f" % pose.properties['r_i_glide_gscore'])

For a higher level API that also starts up and monitors the server itself,
see `GlideServerManager`.
"""
import http
import itertools
import json
import os
import random
import socket
import sys
import time
import urllib
import uuid
import zlib
from pathlib import Path

import backoff
import zmq

from schrodinger import structure
from schrodinger.application.glide import glide
from schrodinger.job import files as jobfiles
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtNetwork
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils import subprocess

logger = log.get_output_logger(__file__)

# Maximum time in seconds to wait for the Glide server to be ready.
MAX_WAIT = 60

GLIDE = os.path.join(os.environ['SCHRODINGER'], 'glide')
RUN = os.path.join(os.environ['SCHRODINGER'], 'run')
SERVER_FINISHED_MESSAGE = "Server timed out while waiting for ligands"
SERVER_NOT_READY_MESSAGE = "Glide server was not ready"


[docs]class ServerTimedOutError(RuntimeError): """ Exception raised if the Glide server is not ready after `MAX_WAIT` """
[docs]class GlideResult: """ Sequence-like object containing the poses returned by Glide for one ligand. When no poses were produced, the reason is available in the .message property. """
[docs] def __init__(self, poses, message=''): """ :param poses: poses returned by glide (may be empty) :type poses: list of schrodinger.structure.Structure :param message: status message explaining why no poses were returned :type message: str """ self.poses = poses self.message = message
[docs] def __len__(self): return len(self.poses)
def __getitem__(self, idx): return self.poses[idx] def __iter__(self): return GlideResultIterator(self.poses, self.message)
[docs] def toJson(self): """ Return a JSON representation of the docking result. :return: JSON :rtype: str """ m2io = ''.join(structure.write_ct_to_string(st) for st in self.poses) obj = {'message': self.message, 'poses': m2io} return json.dumps(obj)
[docs] @classmethod def fromJson(cls, json_str): """ Construct a GlideResult from its JSON representation. :param json_str: JSON representation :type json_str: str :return: GlideResult :rtype: GlideResult """ obj = json.loads(json_str) if obj['poses']: poses = list(structure.MaestroReader('', input_string=obj['poses'])) else: poses = [] return cls(poses, obj['message'])
[docs]class GlideResultIterator: """ Iterator over the poses returned by Glide for one ligand. When no poses were produced, the reason is available in the .message property. """
[docs] def __init__(self, poses, message=''): """ :param poses: poses returned by glide (may be empty) :type poses: list of schrodinger.structure.Structure :param message: status message explaining why no poses were returned :type message: str """ self.poses = poses self.message = message self._iter = iter(poses)
def __next__(self): return next(self._iter) def __iter__(self): return self
[docs] def asGlideResult(self): """ Convert the iterator back into a GlideResult. :return: GlideResult :rype: GlideResult """ return GlideResult(self.poses, self.message)
[docs]class AbstractHTTPClient: """ Interface for connecting to a Glide HTTP server. """
[docs] def dock(self, ct): """ Dock the ligand in Structure object `ct` using the remote Glide server. """ raise NotImplementedError()
[docs] def shutdown_server(self): """ Ask the Glide HTTP server to shut down. """ raise NotImplementedError()
[docs] @staticmethod def ct_to_multipart(ct): """ Encode a CT in multipart/form-data format, ready to POST. :param ct: Structure to encode :type ct: structure.Structure :return: The body of the request and the boundary :rtype: tuple(str, str) """ cts = structure.write_ct_to_string(ct) boundary = "----Boundary%d" % random.randint(0, 100000) while boundary in cts: # unlikely, but you never know... boundary += "%d" % random.randint(0, 100000) body = '--%s\r\n' \ 'Content-Disposition: form-data; name="lig"; ' \ 'filename="lig.mae"\r\n' \ 'Content-Type: application/octet-stream\r\n\r\n' \ '%s--%s--\r\n' % (boundary, cts, boundary) return body, boundary
[docs]class HTTPClient(AbstractHTTPClient): """ This class provides an API to connect to an existing Glide HTTP server. For a higher level API that also starts up and monitors the server itself, see `GlideServerManager`. """
[docs] def __init__(self, con=None, host="localhost", port=8000, timeout=1000): """ Initialize a new HTTPClient object. The optional 'con' is an existing httplib.HTTPConnection object. If not provided, then 'host', 'port', and 'timeout' will be used to create one. The default timeout value is very large to make sure that it is enough for most docking jobs to finish. """ if con: self.con = con else: self.con = http.client.HTTPConnection(host, port, timeout=timeout)
[docs] def dock(self, ct): """ Dock the ligand in Structure object 'ct' using the remote Glide server. :param ct: input ligand :type ct: schrodinger.structure.Structure :return: docking results as an iterator :rtype: GlideResultIterator """ # This method returns an iterator for backward compatibility; it used to # return a MaestroReader, and some callers expect to be able to call # next() on it. The higher-level version in the newer GlideServerManager # class returns the sequence-like GlideResult, which is more versatile. return self._postLigand('/dock_ligand', ct)
[docs] def setReferenceLigand(self, ct): """ Tell the server to use a new reference ligand when docking subsequent ligands. (This only has an effect if the job started with a reference ligand, for example when the job uses core constraints.) :param ct: new reference ligand :type ct: schrodinger.structure.Structure :return: Glide results iterator (empty, but with an "Updated reflig" message) :rtype: GlideResultIterator """ return self._postLigand('/set_reflig', ct)
def _postLigand(self, path, ct): body, boundary = self.ct_to_multipart(ct) self.con.request( "POST", path, body, headers={ "Content-type": "multipart/form-data; boundary=%s" % boundary }) return self._processResponse()
[docs] def dockSmiles(self, smiles): """ Dock a ligand from SMILES. For best results, the server should have been launched with the LIGPREP keyword enabled. :param smiles: ligand SMILES :type smiles: str :return: docking results as an iterator :rtype: GlideResultIterator """ qs = urllib.parse.urlencode({'smiles': smiles}) self.con.request("GET", "/dock_smiles?" + qs) return self._processResponse()
def _processResponse(self): res = self.con.getresponse() if res.status != http.HTTPStatus.OK: raise IOError("Bad response %d %s" % (res.status, res.reason)) body = res.read().decode('utf-8') return iter(GlideResult.fromJson(body))
[docs] def shutdown_server(self): """ Ask the Glide HTTP server to shut down. """ self.con.request("GET", "/shutdown") res = self.con.getresponse() if res.status != http.HTTPStatus.OK: raise IOError("Bad response %d %s" % (res.status, res.reason))
[docs]class NonBlockingHTTPClient(AbstractHTTPClient, QtCore.QObject): """ Class for connecting to a Glide HTTP server and docking poses without blocking. :ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted with a list of pose structures. :ivar noPosesDocked: Signal emitted when a ligand goes through the docking workflow, but does not return any valid poses :ivar errorOccurred: Signal emitted when there is an error communicating with the server. :ivar finished: Signal emitted when an HTTP request finishes. Note: aliased from self.manager.finished """ posesDocked = QtCore.pyqtSignal(list) noPosesDocked = QtCore.pyqtSignal() errorOccurred = QtCore.pyqtSignal(str)
[docs] def __init__(self, host=None, port=8000): """ :param host: Hostname for server :type host: str :param port: Port for server :type port: int """ super().__init__() if host is None: host = "http://localhost" elif "://" not in host: host = f"http://{host}" self.dock_url = self._makeQUrl(host, port, "dock_ligand") self.shutdown_url = self._makeQUrl(host, port, "shutdown") self.dock_smiles_url = self._makeQUrl(host, port, "dock_smiles") self.manager = QtNetwork.QNetworkAccessManager() self.manager.finished.connect(self._onDockingFinished) self.finished = self.manager.finished
@staticmethod def _makeQUrl(host: str, port: int, path: str): """ Converts a host, port, and path into a QUrl :param host: Host (e.g. http://localhost) :param port: Port to connect to :param path: Path relative to the host :rtype: QtCore.QUrl """ parsed_host = urllib.parse.urlparse(host) url_parts = list(parsed_host) url_parts[2] = path url_str = urllib.parse.urlunparse(url_parts) qurl = QtCore.QUrl(url_str) qurl.setPort(port) return qurl
[docs] def dock(self, ct): """ Docks the structure without blocking. :param ct: Structure to dock :type ct: schrodinger.structure.Structure """ body, boundary = self.ct_to_multipart(ct) request = QtNetwork.QNetworkRequest(self.dock_url) content_key = "Content-type" content_value = f"multipart/form-data; boundary={boundary}" request.setRawHeader(content_key.encode("utf-8"), content_value.encode("utf-8")) self.manager.post(request, body.encode("utf-8"))
[docs] def dockSmiles(self, smiles): """ Dock a ligand from SMILES without blocking. :param smiles: ligand SMILES :type smiles: str """ query = QtCore.QUrlQuery() query.addQueryItem('smiles', smiles) self.dock_smiles_url.setQuery(query) request = QtNetwork.QNetworkRequest(self.dock_smiles_url) self.manager.get(request)
def _onDockingFinished(self, reply): """ Checks reply and emits poses if any were produced. :param reply: Network reply :type reply: QtNetwork.QNetworkReply """ error = reply.error() if error != reply.NoError: self.errorOccurred.emit(reply.errorString()) return status = reply.attribute( QtNetwork.QNetworkRequest.HttpStatusCodeAttribute) if status != http.HTTPStatus.OK: self.errorOccurred.emit(f"Bad response: {status}") return if reply.operation() == QtNetwork.QNetworkAccessManager.GetOperation and\ not reply.url().hasQuery(): return data = reply.readAll() data_str = str(data.data(), encoding="utf-8") if data_str.strip() == "": self.errorOccurred.emit("Server didn't produce an error but " "returned no structure data") return result = GlideResult.fromJson(data_str) if len(result) == 0: # TODO: pass result.message so client knows why there are no poses. self.noPosesDocked.emit() return self.posesDocked.emit(list(result)) reply.deleteLater()
[docs] def shutdown_server(self): """ Requests shutting down the server without blocking. """ request = QtNetwork.QNetworkRequest(self.shutdown_url) self.manager.get(request)
[docs]class SubprocessJobAdapter: """ An adapter that starts a subprocess but makes it look a bit like a jobcontrol.Job object. This class does not attempt to expose the full Job API; it only has what is actually used within this module. Its purpose is so the GlideServerManager class can treat job control jobs and subprocesses equally. """
[docs] def __init__(self, cmd, logfile): """ :param cmd: command line to execute :type cmd: list of str :param logfile: file to use for log stdout/stderr :type logfile: file """ self.proc = subprocess.Popen(cmd, stdout=logfile, stderr=subprocess.STDOUT, stdin=subprocess.DEVNULL) self.JobId = str(self.proc.pid)
[docs] def readAgain(self): pass
@property def Status(self): retcode = self.proc.poll() if retcode is None: return 'running' elif retcode == 0: return 'finished' else: return 'died'
[docs] def kill(self): self.proc.terminate() # Wait to collect return status and prevent warnings about suprocesses # "still running". Normally this only takes about 0.2 s, but let's wait # a few seconds and send a hard kill if that fails. try: self.proc.wait(timeout=3.0) except subprocess.TimeoutExpired: self.proc.kill() self.proc.wait()
[docs] def isComplete(self): return self.Status != 'running'
[docs]class AbstractGlideServerManager:
[docs] def __init__(self, keywords, jobdir='.', jobname=None, use_jc=True): """ :param keywords: Glide keywords to use for the job. The only required keyword is GRIDFILE. :type keywords: dict :param jobdir: job directory :type jobdir: str :param jobname: basename for input and output files and for job control. By default, a random jobname is chosen. :type jobname: str :param use_jc: run the Glide backend under job control? :type use_jc: bool """ super().__init__() self.keywords = keywords # Convert jobdir to absolute path and raise error if does not exist self.jobdir = Path(jobdir).resolve(strict=True) self.jobname = jobname or str(uuid.uuid4()) self.job = None self.use_jc = use_jc self.config_file = self.jobdir / (self.jobname + self.config_ext) self._readConfig()
[docs] def start(self, wait=None): """ Launch the Glide Server job. By default, returns as soon as the job is launched, but to make sure that the job is ready to dock call .isReady() until True. :param wait: if given, wait for the server to be ready up to `wait` seconds; if the server is still not ready then, raise a RuntimeError. :type wait: int or NoneType """ # TODO: we could consider support for launching to a remote host. self._clearConfig() if self.use_jc: self._startJC() else: self._startNoJC() logger.info("Launched Glide server job: %s", self.job.JobId) if wait: self.waitUntilReady(wait)
[docs] def isReady(self): """ :return: is the server ready to dock? :rtype: bool """ logger.debug('isReady: %s', self.config) if self.config and self.job: self.job.readAgain() logger.debug('job.Status: %s', self.job.Status) return self.job.Status == 'running' else: return False
[docs] def waitUntilReady(self, timeout=60): """ Wait for the server to be ready. If the timeout is reached and the server is still not ready, raise a RuntimeError. :param timeout: maximum wait in seconds :type timeout: int """ t0 = time.time() while (dt := time.time() - t0) < timeout: time.sleep(1) if self.isReady(): logger.info("Glide server ready after %.1f s", dt) break elif self.job.isComplete(): raise RuntimeError( f"Glide backend failed to start after {dt:.1f} s") else: self.job.kill() raise RuntimeError("Timed out while waiting for Glide server")
[docs] def stop(self, wait=0.0): """ Stop the server. First it will try to send it a shutdown request via the network; if that doesn't work, it will kill via job control. :param float wait: wait up to this time in seconds for the backend process to exit before killing it and returning. If wait==0.0 and the shutdown message was sent successfully via the network, return immediately without killing. """ if self.job is None: return self.job.readAgain() if self.job.Status == "running": try: self.client.shutdown_server() except (socket.error, ValueError): pass # Ignore; will try job control below. else: if not wait: return @backoff.on_predicate(backoff.constant, max_time=wait, interval=0.5) def _wait(): self.job.readAgain() return self.job.isComplete() if _wait(): return # NOTE: call to shutdown_server() could have caused self.job to be # set to None, even if it failed with exception. if self.job is not None and not self.job.isComplete(): self.job.kill()
def _startJC(self): """ Launch the Glide server using job control. """ raise NotImplementedError() def _startNoJC(self): """ Launch the Glide server as a subprocess, without job control. """ raise NotImplementedError() def _clearConfig(self): """ Clear the server config data (e.g., host/port) and remove the config file. """ fileutils.force_remove(self.config_file) self.job = None self._readConfig() def _readConfig(self): """ Read the server config file, if it exists. """ if (self.use_jc and self.job is not None and not self.config_file.exists()): config_base = os.path.basename(self.config_file) self.job.readAgain() try: with jobfiles.get_file(self.job, config_base) as tmpconfig: fileutils.force_rename(tmpconfig, self.config_file) except RuntimeError: # This is a normal sign that the server isn't ready yet. # It is up to the caller to retry. pass try: self._config = json.loads(self.config_file.read_text()) except (IOError, ValueError): # Either the file isn't there or doesn't contain valid JSON. The # latter has been known to happen due to race conditions. self._config = {} # We only support "attaching" to an already running server when # using job control. We could in principle do the same using the PID # when not using job control, but YAGNI plus PIDs may be reused. if self.job is None and self.use_jc: jobid = self._config.get('jobid') if jobid: try: self.job = jobcontrol.Job(jobid) except RuntimeError: pass # Maybe job is too old and job record is gone @property def config(self): """ A dictionary with the information needed to connect with the server: host, port, and jobid. This data is obtained from a JSON file written by the server. If the file does not exist (yet?), the dict will be empty. """ if not self._config: self._readConfig() return self._config
[docs]class GlideServerManager(AbstractGlideServerManager): """ A class to start, stop, monitor, and use a Glide HTTP server. Sample use:: server = GlideServerManager({'GRIDFILE': 'grid.zip'}) server.start() while not server.isReady(): time.sleep(1) poses = server.dock(st) server.stop() """ client_module = 'schrodinger.application.glide.http_server' config_ext = '_http.json'
[docs] def __init__(self, *args, host='localhost', port=0, timeout=1000, **kwargs): """ See parent class for additional arguments. :param host: host to which the server should bind to :type host: str :param port: port where the server should listen. If zero, pick one automatically. :type port: int :param timeout: the server will shut down automatically if this time, in seconds, passes without receiving any connections. :type timeout: int """ super().__init__(*args, **kwargs) options = 'host=%s;port=%d;timeout=%d' % (host, port, timeout) server_keywords = { **self.keywords, 'CLIENT_MODULE': 'schrodinger.application.glide.http_server', 'CLIENT_OPTIONS': options, 'JOBNAME': self.jobname, } self.glide_job = glide.Dock(server_keywords)
def _startJC(self): infile = os.path.join(self.jobdir, self.jobname + '.in') self.glide_job.writeSimplified(infile) cmd = [GLIDE, infile] logger.debug("Launching: %s", ' '.join(cmd)) self.job = jobcontrol.launch_job(cmd) def _startNoJC(self): infile = os.path.join(self.jobdir, self.jobname + '.js') self.glide_job.writeJSON(infile) cmd = [RUN, '-FROM', 'glide', 'glide_backend', infile] logger.debug("Launching: %s", ' '.join(cmd)) logfile = os.path.join(self.jobdir, self.jobname + '.log') with open(logfile, 'w') as fh: self.job = SubprocessJobAdapter(cmd, fh)
[docs] def dock(self, st): """ Dock a ligand. Return a list of poses, which may be empty. If there was a problem connecting to the server, socket.error exceptions may be propagated. :param st: Structure to dock :type st: schrodinger.structure.Structure :return: docking result :rtype: GlideResult """ logger.debug('Docking ligand: %s', st.title) result_iter = self.client.dock(st) glide_result = result_iter.asGlideResult() logger.debug(' got %d poses', len(glide_result)) return glide_result
[docs] def dockSmiles(self, smiles): """ Dock a ligand from SMILES. For best results, the server should have been launched with the LIGPREP keyword enabled. :param smiles: ligand SMILES :type smiles: str :return: docking result :rtype: GlideResult """ logger.debug('Docking ligand SMILES: %s', smiles) result_iter = self.client.dockSmiles(smiles) glide_result = result_iter.asGlideResult() logger.debug(' got %d poses', len(glide_result)) return glide_result
[docs] def setReferenceLigand(self, st): """ Tell the server to use a new reference ligand when docking subsequent ligands. (This only has an effect if the job started with a reference ligand, for example when the job uses core constraints.) :param st: new reference ligand :type st: schrodinger.structure.Structure :return: Glide result (empty, but with an "Updated reflig" message) :rtype: GlideResult """ result_iter = self.client.setReferenceLigand(st) return result_iter.asGlideResult()
@property def client(self): """ Client object to be used for connecting to the Glide server process. :rtype: HTTPClient :return: the client object. """ if not self.config: raise ValueError('Glide server is not ready') return HTTPClient(host=self.config['host'], port=self.config['port'])
[docs]class NonBlockingGlideServerManager(GlideServerManager, QtCore.QObject): """ A class to use a Glide HTTP server without blocking. :ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted with a list of pose structures. :ivar noPosesDocked: Signal emitted when a ligand goes through the docking workflow, but does not return any valid poses :ivar errorOccurred: Signal emitted when there is an error communicating with the server. :ivar batchFinished: Signal emitted when a docking batch finishes. Emitted with the number of ligands that were docked. """ posesDocked = QtCore.pyqtSignal(list) noPosesDocked = QtCore.pyqtSignal() batchFinished = QtCore.pyqtSignal(int) errorOccurred = QtCore.pyqtSignal(str)
[docs] def __init__(self, *args, **kwargs): # See GlideServerManager for argument documentation super().__init__(*args, **kwargs) self._docking_active = False self._resetBatch() self._client = None self._backend_checker = NonBlockingBackendChecker() self._backend_checker.setBackend(self)
[docs] def stop(self, *args, **kwargs): self._backend_checker.stopChecking() super().stop(*args, **kwargs)
[docs] def dock(self, st): """ Docks a given structure :param st: Structure or SMILES to dock :type st: structure.Structure or str :raises RuntimeError if stop() is called on the server during startup """ self._docking_active = True client = self._getClient() if isinstance(st, str): client.dockSmiles(st) else: client.dock(st)
[docs] def dockBatch(self, structures, done_adding=True): """ Set a batch of structures to dock. :param structures: Structures to dock :type structures: iterable[schrodinger.structure.Structure] :param bool done_adding: Whether to emit batchFinished when all sts in `structures` finish docking. Pass False if structures is a generator that can have more structures added to it at runtime. """ self._resetBatch() self._structure_queue = iter(structures) if done_adding: self.setDoneAdding() else: # setDoneAdding calls _update so we manually call it here self._update()
[docs] def cancelBatch(self): self._cancel_requested = True self._update()
[docs] def addStructures(self, structures): """ Add more structures to dock. Should call `setDoneAdding` once all structures have been added. :param structures: Structures to dock :type structures: iterable[schrodinger.structure.Structure] """ self._structure_queue = itertools.chain(self._structure_queue, structures) self._update()
[docs] def setDoneAdding(self): """ Call this when done adding structures to a batch; once all structures have been docked, `batchFinished` will be emitted. """ self._done_adding = True self._update()
def _update(self): """ Update the server manager state. - Docks the next structure if not already docking - Emits batchFinished if all structures are docked and no more structures will be added """ if self._docking_active: # Return early if already docking return if self._cancel_requested: self._backend_checker.stopChecking() self._finishBatch() return next_st = next(self._structure_queue, None) if next_st is not None: self._batch_size += 1 self.dock(next_st) elif self._done_adding: self._finishBatch() def _finishBatch(self): self.batchFinished.emit(self._batch_size) self._resetBatch() def _resetBatch(self): self._batch_size = 0 self._structure_queue = iter(()) self._done_adding = False self._cancel_requested = False @QtCore.pyqtSlot() def _onDockFinished(self): """ When docking finishes, set docking active to False and update state. """ self._docking_active = False self._update() def _getClient(self): self._backend_checker.wait() if not self._backend_checker.ready: exc = self._backend_checker.error or SERVER_NOT_READY_MESSAGE self.errorOccurred.emit(str(exc)) self._finishBatch() return self.client @property def client(self): if not self.config: raise ValueError('Glide server is not ready') if self._client is None: self._client = NonBlockingHTTPClient(host=self.config['host'], port=self.config['port']) self._client.posesDocked.connect(self.posesDocked) self._client.noPosesDocked.connect(self.noPosesDocked) self._client.errorOccurred.connect(self.errorOccurred) self._client.finished.connect(self._onDockFinished) return self._client
[docs]class NonBlockingBackendChecker(QtCore.QObject): """ Class to get an GlideServerManager object to the ready state without blocking the GUI. :ivar ready: Whether the backend is ready """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.ready = False self.error = None self._backend = None self._timer = QtCore.QTimer() self._timer.setInterval(2000) self._timer.timeout.connect(self._checkBackend) self._event_loop = QtCore.QEventLoop()
[docs] def wait(self, timeout=MAX_WAIT): """ Block python execution (but not the GUI) until the backend is ready or the timeout is reached. :param timeout: Timeout in seconds or None to not timeout. :type timeout: int or NoneType :return: Whether the backend is ready """ if self._backend is None: raise RuntimeError("Must call `setBackend` before `wait`") self.ready = False self.error = None # Check once and return early if it's already ready or an exception # occurred self._checkBackend() if self.ready or self.error is not None: return self.ready # Otherwise, check on a timer if timeout == 0: self.onServerTimeout() return self.ready elif timeout is not None: QtCore.QTimer.singleShot(timeout * 1000, self.onServerTimeout) self._timer.start() self._event_loop.exec() return self.ready
[docs] def setBackend(self, backend): """ Set the backend object to call isReady on :type backend: GlideServerManager """ self.stopChecking() self._backend = backend self.ready = False
[docs] def stopChecking(self): """ Stop checking whether the backend is ready """ self._timer.stop() self._event_loop.quit()
[docs] @QtCore.pyqtSlot() def onServerTimeout(self, *, msg=None): if msg is None: msg = "Server was not ready in time" self.error = ServerTimedOutError(msg) self.stopChecking()
def _checkBackend(self): try: self.ready = self._backend.isReady() except jobcontrol.JobLaunchFailure as exc: self.error = exc self.stopChecking() if self.ready: self.stopChecking() else: job = self._backend.job if job and job.Status in ('finished', 'died'): if job.Status == 'finished': msg = SERVER_FINISHED_MESSAGE else: msg = "Server died" self.onServerTimeout(msg=msg)
[docs]class ZmqClient(QtCore.QObject): """ Connect to a Glide ZMQ driver and dock ligands synchronously. A Glide ZMQ driver is a Glide job launched with the -mq and -server flags. :ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted with a list of pose structures. :ivar noPosesDocked: Signal emitted when a ligand goes through the docking workflow, but does not return any valid poses :ivar noMoreLigs: Signal emitted when there are no more ligands left to dock in the server's queue. :ivar gotStatus: Signal emitted when the server sends a status message. Emitted with a dict with status information. Current keys include "elapsed_time", "ligs_done", "poses_stored", "ligs_per_sec", "lps_per_worker", "active_workers", "idle_workers", and "lost_workers". """ posesDocked = QtCore.pyqtSignal(list) noPosesDocked = QtCore.pyqtSignal() noMoreLigs = QtCore.pyqtSignal() gotStatus = QtCore.pyqtSignal(dict)
[docs] def __init__(self, url): """ :param url: URL of server to connect to. :type url: str """ super().__init__() self._lignum = 0 context = zmq.Context.instance() self._sock = context.socket(zmq.DEALER) self._sock.connect(url) fd = self._sock.getsockopt(zmq.FD) self._notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read, self) self._notifier.activated.connect(self._getMessages) # For some reason, I can't get the QSocketNotifier above to work # reliably; when I test this class by itself it works, but hooked up to # the server manager / ligand designer, I don't get any notifications. # # Perhaps the raw socket by this point already has some readable data # due to low-level ZMQ traffic, and the notifer only notifies about the # transition from "no data" to "data", and not from "data" to "more # data"? # # Polling the ZMQ socket here sometimes seemed to clear that up, but # even that was not reliable enough, so to be safe, we'll check for # incoming messages periodically using a timer. self._timer = QtCore.QTimer(self) self._timer.setInterval(1000) self._timer.timeout.connect(self._getMessages) self._timer.start()
[docs] def dock(self, st, lignum=None): """ Send a ligand to the server to dock. Poses are returned asynchronously via the posesDocked and noPosesDocked signals. :param st: Structure to dock :type st: schrodinger.structure.Structure :param lignum: ligand index (stored by Glide in the i_i_glide_lignum property). It can be used to tell which ligand is which, because the poses are not in general returned in the same order in which the ligands are sent to the server. If not provided, a sequential number will be used. Note that unique numbers should be used, because lignum is used as a key by the server! :type lignum: int or NoneType """ if lignum: self._lignum = lignum else: self._lignum += 1 blob = st.writeToString(structure.MAESTRO).encode('utf-8') msg = {'cmd': 'LIGS', 'ligs': [(self._lignum, blob)]} self._sock.send_pyobj(msg)
@QtCore.pyqtSlot() def _getMessages(self): """ Receive and process any messages from the ZMQ socket. """ while self._sock.poll(1): msg = self._sock.recv_pyobj() if msg['cmd'] == 'POSES': poses = [st_from_blob(blob) for blob in msg['blobs']] if poses: self.posesDocked.emit(poses) else: self.noPosesDocked.emit() if msg['queue_len'] == 0: self.noMoreLigs.emit() elif msg['cmd'] == 'INFO': self.gotStatus.emit(msg['status'])
[docs] def shutdown_server(self): self._sock.send_pyobj({'cmd': 'STOP'})
[docs] def cancel(self): self._sock.send_pyobj({'cmd': 'CANCEL'})
[docs] def getStatus(self): """ Request a status update from the server. When the response comes, the gotStatus signal is emitted with a dict containg status information. """ self._sock.send_pyobj({'cmd': 'STATUS'})
[docs] def disconnect(self): """ Disconnect the ZMQ socket and stop listening. """ self._notifier.setEnabled(False) self._timer.stop() self._sock.close()
[docs]class NonBlockingGlideServerManagerZmq(AbstractGlideServerManager, QtCore.QObject): """ Launch and use Glide ZMQ server without blocking. The server and workers are run on localhost, but future versions might relax this restriction. This class is meant as a drop-in replacement for NonBlockingGlideServerManager, but there are a couple of subtle differences in behavior: 1) when docking a batch, all ligands are sent immediately to the server, which keeps its own queue. This could be a problem when feeding an infinite iterable to dockBatch(). 2) cancelBatch() causes any currently docking ligands to be aborted, unlike NonBlockingGlideServerManager which waits for the current ligand to complete. :ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted with a list of pose structures. :ivar noPosesDocked: Signal emitted when a ligand goes through the docking workflow, but does not return any valid poses :ivar errorOccurred: Signal emitted when there is an error communicating with the server. :ivar batchFinished: Signal emitted when a docking batch finishes. Emitted with the number of ligands that were docked. :ivar gotStatus: Signal emitted when the server sends a status message. Emitted with a dict with status information. Current keys include "elapsed_time", "ligs_done", "poses_stored", "ligs_per_sec", "lps_per_worker", "active_workers", "idle_workers", and "lost_workers". """ config_ext = '_zmq.json' posesDocked = QtCore.pyqtSignal(list) noPosesDocked = QtCore.pyqtSignal() batchFinished = QtCore.pyqtSignal(int) errorOccurred = QtCore.pyqtSignal(str) gotStatus = QtCore.pyqtSignal(dict)
[docs] def __init__(self, *args, nworkers=2, **kwargs): """ Most arguments are passed through to GlideServerManager; see that class for documentation. Arguments specific to this class: :param nworkers: number of workers to launch. :type nworkers: int """ super().__init__(*args, **kwargs) self.nworkers = nworkers self._client = None self._backend_checker = NonBlockingBackendChecker() self._backend_checker.setBackend(self) self._resetBatch() server_keywords = {**self.keywords, 'JOBNAME': self.jobname} self.glide_job = glide.Dock(server_keywords)
[docs] def stop(self, *args, **kwargs): self._backend_checker.stopChecking() super().stop(*args, **kwargs) if self._client is not None: self._client.disconnect()
[docs] def dock(self, st): client = self._getClient() client.dock(st) self._sent += 1
[docs] def dockBatch(self, structures, done_adding=True, reset=True): """ Set a batch of structures to dock. :param structures: Structures to dock :type structures: iterable[schrodinger.structure.Structure] :param bool done_adding: Whether to emit batchFinished when all sts in `structures` finish docking. Pass False if structures is a generator that can have more structures added to it at runtime. :param bool reset: Whether to reset the batch before adding. Pass False to expand the current batch instead of starting a new one. """ if reset: self._resetBatch() for st in structures: self.dock(st) if done_adding: self.setDoneAdding()
[docs] def cancelBatch(self): if self.config: self.client.cancel() self._finishBatch()
[docs] def addStructures(self, structures): """ Add more structures to dock. Should call `setDoneAdding` once all structures have been added. :param structures: Structures to dock :type structures: iterable[schrodinger.structure.Structure] """ self._done_adding = False self.dockBatch(structures, done_adding=False, reset=False)
[docs] def setDoneAdding(self): """ Call this when done adding structures to a batch; once all structures have been docked, `batchFinished` will be emitted. """ self._done_adding = True if self._sent == self._received: self._finishBatch()
@property def client(self): if not self.config: raise ValueError('Glide server is not ready') if self._client is None: self._client = ZmqClient(self.config['url']) self._client.posesDocked.connect(self._onPosesDocked) self._client.noPosesDocked.connect(self._onNoPosesDocked) self._client.noMoreLigs.connect(self._onNoMoreLigs) self._client.gotStatus.connect(self.gotStatus) return self._client
[docs] def getStatus(self): client = self._getClient() client.getStatus()
def _getClient(self): self._backend_checker.wait() if not self._backend_checker.ready: exc = self._backend_checker.error or SERVER_NOT_READY_MESSAGE self.errorOccurred.emit(str(exc)) self._finishBatch() return self.client def _finishBatch(self): self.batchFinished.emit(self._received) self._resetBatch() def _resetBatch(self): self._sent = 0 self._received = 0 self._done_adding = False def _setupJob(self): """ Write the input file and return the command line arguments which are used whether launching the driver under job control or not. """ infile = os.path.join(self.jobdir, self.jobname + '.in') self.glide_job.writeSimplified(infile) cmd = [ '-HOST', f'localhost:{self.nworkers}', '-NJOBS', str(self.nworkers), '-server', '-mq', '-bind_local', infile ] return cmd
[docs] def start(self, *args, **kwargs): with fileutils.chdir(self.jobdir): return super().start(*args, **kwargs)
def _startJC(self): common_args = self._setupJob() cmd = [GLIDE] + common_args logger.debug("Launching: %s", ' '.join(cmd)) self.job = jobcontrol.launch_job(cmd) def _startNoJC(self): common_args = self._setupJob() cmd = [RUN, '-FROM', 'glide', 'glide_driver.py'] + common_args logger.debug("Launching: %s", ' '.join(cmd)) logfile = os.path.join(self.jobdir, self.jobname + '.log') with open(logfile, 'w') as fh: self.job = SubprocessJobAdapter(cmd, fh) @QtCore.pyqtSlot(list) def _onPosesDocked(self, poses): self._received += 1 self.posesDocked.emit(poses) @QtCore.pyqtSlot() def _onNoPosesDocked(self): self._received += 1 self.noPosesDocked.emit() @QtCore.pyqtSlot() def _onNoMoreLigs(self): if self._done_adding: self._finishBatch()
[docs]def st_from_blob(blob): """ Return a Structure from a compressed blob. :param blob: compressed m2io representation of a structure :type blob: bytes :return: Structure :rtype: schrodinger.structure.Structure """ s = zlib.decompress(blob).decode('utf-8') return next(structure.StructureReader.fromString(s))
if __name__ == '__main__': # for standalone testing, we'll read a ligand file, POST it to the server # one ligand at a time, and write the poses we get back to an output file. if len(sys.argv) != 5: sys.exit("Usage: %s <host> <port> <ligfile> <output-posefile>" % sys.argv[0]) host, port, infile, outfile = sys.argv[1:] client = HTTPClient(host=host, port=port) writer = structure.StructureWriter(outfile) for ct in structure.StructureReader(infile): print("docking ligand: %s" % ct.title) poses = client.dock(ct) for pose in poses: gscore = 0.0 try: gscore = pose.property['r_i_glide_gscore'] except KeyError: pass print(" writing pose; gscore=%f" % gscore) writer.append(pose)