Source code for schrodinger.application.desmond.feputils

import os
import tempfile

import schrodinger
import schrodinger.protein._reliability as structure_reliability
from schrodinger import project
from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.protein import reliability as prot_reliability
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.Qt import QtWidgets
from schrodinger.Qt.QtCore import Qt
from schrodinger.structutils import analyze
from schrodinger.ui.qt import widgetmixins
from schrodinger.ui.qt.protein_health_viewer import ProteinHealthViewer
from schrodinger.ui.qt.standard.icons import icons
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess

from . import constants

maestro = schrodinger.get_maestro()

ENTRY_ID_KEY = 's_fepmapper_entryid'
NO_CACHE_STRING = 'No cache string'
VACUUM_OPTION = '-vacuum'
FFBUILDER_ARG = '-ffbuilder'
FFBUILDER_HOST_ARG = '-ff-host'
FFBUILDER_HOST_SETTING = 'ffb_host'
FFBUILDER_SUBJOBS_SETTING = 'max_ffb_subjobs'


[docs]def run_protein_reliability(protein_file): """ Takes a protein ct and returns a message indicating problems, if any. WARNING: This function runs *very* slowly (i.e., easily around 20 seconds). :param protein_file: A protein to assess :type protein_file: `schrodinger.structure.Structure` :return: A tuple that contains a message indicating any problems with the protein and model that can be used to bring up protein reliability panel. :rtype: tuple """ outputfile, popen, reportfile = launch_protein_reliability(protein_file) _, std_err = popen.communicate() msg, model = read_protein_reliability(popen, std_err, reportfile, outputfile) return msg, model
[docs]def read_protein_reliability(p, std_err, reportfile, outputfile): if p.returncode != 0: msg = ("ERROR: Failed to run protein reliability report.\n\n" "Error output:\n%s" % std_err) return msg, None # generate message that will be used as a tooltip prot_probs = [] with open(reportfile, 'r') as fh: for line in fh: prot_probs.append(line.strip()) msg = "" if prot_probs: PROT_PROBS_SEPARATOR = '\n - ' msg = "Potential receptor issues:" + PROT_PROBS_SEPARATOR msg += PROT_PROBS_SEPARATOR.join(prot_probs) # store model that will be used to show protein reliability report protein = structure.Structure.read(outputfile) model = structure_reliability.ModelCheck(protein, do_calc=False) fileutils.force_remove(reportfile, outputfile) return msg, model
[docs]def launch_protein_reliability(protein_file): # Running in a separate process in order to prevent blocking of the # GUI, see PANEL-2976 run = os.path.join(os.environ['SCHRODINGER'], 'run') script = prot_reliability.__file__ reportfile = tempfile.mktemp('.txt', 'prot_probs_') outputfile = tempfile.mktemp('.mae', 'prot_rel_') popen = subprocess.Popen( [run, script, protein_file, reportfile, outputfile], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) return outputfile, popen, reportfile
[docs]def is_prepped(protein): """ Check if a protein has been prepared in Protein PrepWizard :param protein: protein to check :type protein: protein: `schrodinger.structure.Structure` """ return protein.property.get('b_ppw_prepared', False)
[docs]class ProteinProcessManager(object): """ A lightweight thread that spawns the protein reliability process to run in the background. """
[docs] def __init__(self, protein_file): self.protein_file = protein_file self.message = '' self.model = None self.popen = None self._is_running = False
[docs] def start(self): outputfile, popen, reportfile = launch_protein_reliability( self.protein_file) self.outputfile, self.reportfile = outputfile, reportfile self.popen = popen self._is_running = True
[docs] def poll(self): retcode = self.popen.poll() if retcode is not None: _, std_err = self.popen.communicate() self.message, self.model = read_protein_reliability( self.popen, std_err, self.reportfile, self.outputfile) self._is_running = False finished = not self._is_running return finished
[docs] def terminate(self): if self.popen is not None: # subprocess needs to be terminated or it will be stranded self.popen.terminate() if self.outputfile and self.reportfile: fileutils.force_remove(self.reportfile, self.outputfile) self._is_running = False
[docs] def isRunning(self): return self._is_running
[docs] def results(self): return self.message, self.model
[docs]class LabelSpinner(QtWidgets.QLabel): """ A label that can replace its icon with a spinning progress icon. """ DEFAULT_ICON_HEIGHT = 10 ANIMATION_TIME = 250 animationTimeElapsed = QtCore.pyqtSignal()
[docs] def __init__(self, text=None, parent=None): super(LabelSpinner, self).__init__(text, parent) self.updatePermanentPic() self.updateAnimationSize() self.current_num = 0 self.timer = QtCore.QTimer() self.timer.setInterval(self.ANIMATION_TIME) self.timer.timeout.connect(self.animationTimeElapsed) self.animationTimeElapsed.connect(self.updateAnimation)
[docs] def updateAnimationSize(self): """ Set the size of the spinner animation to the height of the current permanent icon, or else some default value if it is not available. """ if self.permanent_pic: height = self.permanent_pic.height() else: height = self.DEFAULT_ICON_HEIGHT # Generate the animation frames path_template = ":/images/toolbuttons/jobrunning/{0}.png" icons = [QtGui.QIcon(path_template.format(num)) for num in range(1, 9)] self.pics = [icon.pixmap(height, height) for icon in icons]
[docs] def updatePermanentPic(self): """ Set the permenent icon picture to the current pixmap value and update the size of the spinner animation to match. """ if self.pixmap(): self.permanent_pic = self.pixmap().copy() else: self.permanent_pic = None self.updateAnimationSize()
[docs] def setPixmap(self, pixmap): """ Set a new pixmap, and update the spinner animation frames in response. The `setTemporaryPixmap()` method should be called internally when setting a temporary pixmap for the spinner animation frames and other temporary icon images. :param pixmap: the icon to be displayed by this label :type pixmap: QtGui.QPixmap """ super(LabelSpinner, self).setPixmap(pixmap) self.updatePermanentPic()
[docs] def setTemporaryPixmap(self, pixmap): """ Set the displayed pixmap without overwriting the "permanent" cached pixmap value or altering the size of the animation frames. Meant to be used to display animation frames or other temporary icon images. :param pixmap: the icon to be displayed by this label :type pixmap: QtGui.QPixmap """ super(LabelSpinner, self).setPixmap(pixmap)
[docs] def updateAnimation(self): """ Advance the animation by one frame. This method gets called periodically while the spinner is running. """ pixmap = self.pics[self.current_num] self.current_num += 1 self.current_num %= len(self.pics) self.setTemporaryPixmap(pixmap)
[docs] def start(self): """ Start the spinner animation. """ self.current_num = 0 self.updateAnimation() self.timer.start()
[docs] def stop(self): """ Stop the spinner animation and restore the original icon. """ self.timer.stop() if self.permanent_pic: self.setTemporaryPixmap(self.permanent_pic)
[docs]class BaseSpinnerWidget(widgetmixins.InitMixin, QtWidgets.QWidget): """ A widget that maintains a label and a button for displaying health check status information. :cvar tooltip_calculating: the tooltip text for when the health is being calculated :vartype tooltip_calculating: str """ tooltip_calculating = ''
[docs] def initSetOptions(self): super().initSetOptions() self.check_msg = '' self.input_file = None self.model = None self.image_height = 14 self.warning_icon = QtGui.QIcon(icons.VALIDATION_WARNING_LB) self.blank_pixmap = QtGui.QPixmap(self.image_height, self.image_height) color = QtGui.QColor(0, 0, 0, 0) self.blank_pixmap.fill(color)
[docs] def initSetUp(self): super().initSetUp() self.label = LabelSpinner(parent=self) self.health_btn = QtWidgets.QToolButton(parent=self) # Use "warning" icon as default icon = self.warning_icon self.setButtonIcon(icon) height = self.image_height pixmap = icon.pixmap(height, height) self.setLabelPixmap(pixmap)
[docs] def initLayOut(self): super().initLayOut() layout = QtWidgets.QHBoxLayout() layout.addWidget(self.label) layout.addWidget(self.health_btn) self.widget_layout.addLayout(layout)
[docs] def initFinalize(self): super().initFinalize() self.showSubwidgets(False, False)
[docs] def showSubwidgets(self, show_label, show_button): """ Convenience method for showing or hiding the label and button. :raise: ValueError :param show_label: whether to show the label :type show_label: bool :param show_button: whether to show the button :type show_button: bool """ if show_label and show_button: msg = ('The spinner label and health button should not be visible' ' at the same time.') raise ValueError(msg) self.label.setVisible(show_label) self.health_btn.setVisible(show_button)
[docs] def start(self): """ Display a spinning animation in the label widget. """ self.showSubwidgets(True, False) self.label.start()
[docs] def stop(self): """ Stop the animation and restore the original icon. """ self.label.stop() self.showSubwidgets(True, False)
def _updateButtonImageHeight(self): """ Update the button image to used the stored height. """ size = QtCore.QSize(self.image_height, self.image_height) self.health_btn.setIconSize(size)
[docs] def setButtonIcon(self, icon): """ :param icon: the icon to display in the button :type icon: QtGui.QIcon """ self.health_btn.setIcon(icon) self._updateButtonImageHeight()
[docs] def setLabelPixmap(self, pixmap): """ :param pixmap: the pixmap to display in the label :type pixmap: QtGui.QPixmap """ pixmap = self._getCorrectSizePixmap(pixmap) self.label.setPixmap(pixmap)
def _getCorrectSizePixmap(self, pixmap): """ :param pixmap: a pixmap :type pixmap: QtGui.QPixmap :return: the same pixmap, resized to a `image_height`-sized square :rtype: QtGui.QPixmap """ if pixmap.isNull(): return pixmap height = self.image_height return pixmap.scaled(height, height, Qt.IgnoreAspectRatio, Qt.SmoothTransformation)
[docs] def clearLabelPixmap(self): """ Replace the label pixmap with a transparent pixmap. """ pixmap = self._getCorrectSizePixmap(self.blank_pixmap) self.label.setPixmap(pixmap)
[docs] def setLabelToolTip(self, text): self.label.setToolTip(text)
[docs] def setButtonToolTip(self, text): self.health_btn.setToolTip(text)
[docs]class ProteinCheckSpinnerWidget(BaseSpinnerWidget): """ A protein checker widget that includes a button to display the health status and a label to display a spinner. Shows a spinning animation in the label while the check is running. :cvar receptorHealthChanged: signal emitted containing the tooltip for the visible widget (label or button) :vartype receptorHealthChanged: QtCore.pyqtSignal(str) """ results_cache = {} tooltip_calculating = 'Calculating protein health' receptorHealthChanged = QtCore.pyqtSignal(str)
[docs] def __init__(self, parent=None, use_cache=False): """ :param use_cache: whether or not to cache results based upon the string representation, which introduces latency in the string generation, but saves completed results and checks if the protein changes before starting a new check thread when calling `setProtein`. Defaults to `False`. :type use_cache: bool """ self._use_cache = use_cache super().__init__(parent=parent)
[docs] def initSetOptions(self): super().initSetOptions() self.process_manager = None self.protein = None self.protein_string = ''
[docs] def initSetUp(self): super().initSetUp() self.label.animationTimeElapsed.connect(self.update) self.health_btn.clicked.connect(self.showProteinHealthReport)
[docs] def initSetDefaults(self): super().initSetDefaults() self.setProtein(None)
[docs] def start(self): """ Start the protein health check and spinner animation. """ if not self.startValidate(): self.finish() return super().start() self.input_file = fileutils.tempfilename('val_prot_', '.maegz') self.process_manager = ProteinProcessManager(self.input_file) self.protein.append(self.input_file) self.process_manager.start() self.updateToolTips()
[docs] def startValidate(self): """ :return: whether a health check needs to be run :rtype: bool """ if self.protein_string in self.results_cache: self.check_msg, self.model = self.results_cache[self.protein_string] return False return True
[docs] def stop(self): """ Stop the spinner animation and health check process. """ super().stop() if self.process_manager and self.process_manager.isRunning(): self.process_manager.terminate() if self.input_file: self.input_file.remove()
[docs] def update(self): """ Check if the reliability calculation is finished and if so, stop the spinner and display the results. """ if self.process_manager.poll(): self.stop() self._getProcessResults() self.finish()
def _getProcessResults(self): """ Pull down the results of the finished process """ self.check_msg, self.model = self.process_manager.results() if self._use_cache: self.results_cache[self.protein_string] = (self.check_msg, self.model)
[docs] def updateToolTips(self): """ Update widget tooltips depending on the status of the health check. """ if self.process_manager is not None: self.setButtonToolTip('') self.setLabelToolTip(self.tooltip_calculating) self.receptorHealthChanged.emit(self.tooltip_calculating) return tooltip = self.check_msg self.receptorHealthChanged.emit(tooltip) if not is_prepped(self.protein): if tooltip: tooltip += '\n' tooltip += ('Note: For best results, prepare the receptor using' ' prepwizard') if tooltip: tooltip += '\n\n' tooltip += "Click to view Protein Reliability Report." self.setButtonToolTip(tooltip) self.setLabelToolTip('')
[docs] def finish(self): """ Display the health check results. If there are any problems, the warning label is set to visible and the tooltip will report the issue. If there are no problems, the label is hidden. """ health_check = not self.check_msg self.showSubwidgets(health_check, not health_check) if health_check: self.clearLabelPixmap() self.label.setText("<font color='green'>OK</font>") self.process_manager = None self.updateToolTips()
[docs] def setProtein(self, protein): """ Set the protein for the spinner, stopping and restarting the spinner with the new protein if it has changed or if `_use_cache` is `False`. :param protein: the new protein :type protein: structure.Structure """ self.stop() if self._use_cache and protein is not None: protein_string = protein.__getstate__() else: protein_string = NO_CACHE_STRING if not self._use_cache or protein_string != self.protein_string: self.protein = protein self.protein_string = protein_string if protein is not None: self.start()
[docs] def showProteinHealthReport(self): """ Bring up protein reliability panel to show results of protein check. """ if not self.model: return viewer = ProteinHealthViewer(flags=Qt.Dialog) viewer.loadModel(self.model) viewer.show()
[docs]def truncate_label(label, label_string, max_chars=40): """ Sets the text on a label to label_string, truncating the text if necessary and setting the full text in a tooltip, if the text has been truncated. The original tooltip text will be returned. :param label: the label to modify :type label: QtWidgets.QLabel :param label_string: the text for the label :type label_string: str :param max_chars: maximum number of characters in the label text :type max_chars: int :return: the original tooltip text :rtype: str """ original_tooltip = label.toolTip() new_tooltip = "" if len(label_string) > max_chars: cutoff = max_chars - 3 new_tooltip = label_string label_string = label_string[:cutoff] + "..." label.setText(label_string) if new_tooltip: label.setToolTip(new_tooltip) return original_tooltip
[docs]def get_protein_label_string(ct): """ Returns a string for use in GUI to indicate the protein. Returns the title if available; otherwise returns a short description. If called with None, returns "--". :param ct: the protein :type ct: `schrodinger.structure.Structure` """ if not ct: return '--' if not ct.title.strip(): plural_chain = "s" if len(ct.chain) > 1 else "" plural_res = "s" if len(ct.residue) > 1 else "" ct_info = str(len(ct.chain)) + " chain%s; " % plural_chain ct_info += str(len(ct.residue)) + " residue%s; " % plural_res ct_info += str(len(ct.atom)) + " atoms" return ct_info return ct.title
[docs]def format_structure_label(ct, label, max_chars=40): """ Sets the protein name on a label, truncating the text if necessary and setting the untruncated text in a tooltip, if the text has been truncated. :param ct: the protein :type ct: `schrodinger.structure.Structure` :param label: The label to modify :type label: QtWidgets.QLabel :param max_chars: The max number of characters to allow on the label :type max_chars: int """ label_string = get_protein_label_string(ct) truncate_label(label, label_string, max_chars) return label
[docs]def get_proteins(ct_list): """ Iterates through a list of structures and returns only the proteins :param ct_list: a list of structures :type ct_list: list """ proteins = [] for ct in ct_list: if analyze.evaluate_asl(ct, "protein"): proteins.append(ct) return proteins
[docs]def get_ligands(ct_list): """ Iterates through a list of structures and returns only the ligands. :param ct_list: a list of structures :type ct_list: list """ def peptidic_ligand(ligand): n_protein_atoms = len(analyze.evaluate_asl(ct, "protein and not a.e H")) if n_protein_atoms < 50: return True return False ligands = [] for ct in ct_list: if analyze.evaluate_asl(ct, "ligand") and peptidic_ligand(ct): ligands.append(ct) return ligands
[docs]def import_pt_entries(): """ Imports selected entries from the project table. For convenience, workspace-included proteins are also imported. :return: a list of imported structures. Each structure is tagged with a property 's_fepmapper_entryid' to store the entry id. :rtype: list """ if not maestro: return [] try: pt = maestro.project_table_get() except project.ProjectException: return [] if not pt.selected_rows: return [] ct_list = [] entry_id_list = [] # Only proteins are imported via inclusion; ligands must be selected for row in pt.included_rows: ct = row.getStructure() if analyze.evaluate_asl(ct, "protein"): ct.property[ENTRY_ID_KEY] = row.entry_id ct_list.append(ct) entry_id_list.append(row.entry_id) for row in pt.selected_rows: ct = row.getStructure() if row.entry_id in entry_id_list: # Don't double-import workspace-included proteins continue ct.property[ENTRY_ID_KEY] = row.entry_id ct_list.append(ct) entry_id_list.append(row.entry_id) return ct_list
[docs]def import_pv_file(filename): """ Imports a list of structures from a structure file. Typically used on PV files, but can be used on any structure file. :param filename: the filename :type filename: str """ return [ct for ct in structure.StructureReader(filename)]
[docs]def get_opls_dir_cmd(opls_dir): """ Construct and return the cmd for the given OPLS directory. :param opls_dir: OPLS directory path :type opls_dir: str :return: a command list for the OPLS directory :rtype: list[str] """ opls_path = mm.get_archive_path(opls_dir) return ['-OPLSDIR', opls_path]
[docs]def get_restart_opls_dir(jobname): """ Create opls dir command for use in restarting/extending scripts :param jobname: Jobname :type jobname: str :return: a command list for the OPLS directory :rtype: list[str] """ return f'{jobname}-out.opls'
[docs]def make_fep_cmd( cd_params, ao, jobname, struct_fname, opt=[], # noqa: M511 opls_dir=None, use_ffbuilder=False): """ Generates an FEP command list based on the specified parameters. :param cd_params: config dialog parameters :type cd_params: dict :param ao: FEP Advanced Options (AO) parameters :type ao: dict :param jobname: the jobname :type jobname: str :param main_msj_fname: the filename for the main msj file :type main_msj_fname: str :param struct_fname: the filename for the input structure :type struct_fname: str :param opls_dir: OPLS directory path :type opls_dir: str or None :param use_ffbuilder: whether ffb_fep_plus as opposed to fep_plus should be used in the command as well as in the written start script (only). :type use_ffbuilder: bool :return: a command list for launching the job :rtype: list """ cmd = [ f'{os.environ["SCHRODINGER"]}/fep_plus', '-HOST', cd_params['host'], '-SUBHOST', cd_params['subjob_host'], '-ppj', str(cd_params['cpus']) ] if use_ffbuilder: ffb_host = cd_params[FFBUILDER_HOST_SETTING] ffb_subjobs = cd_params[FFBUILDER_SUBJOBS_SETTING] cmd += generate_ffbuilder_options(ffb_host, ffb_subjobs) maxjob = cd_params.get('maxjobs') if maxjob: cmd += ['-maxjob', str(maxjob)] sim_time = ao['sim_time'] * 1000 if sim_time != 5000.0: cmd += ['-time', str(sim_time)] cmd += ['-ensemble', ao['ensemble']] if ao['random_seed'] != 2014: cmd += ['-seed', str(ao['random_seed'])] if ao['buffer_size'] != 5.0: cmd += ['-buffer', str(ao['buffer_size'])] if ao.get('add_salt', False): cmd += ['-salt', str(ao['salt_molarity'])] if ao['relative_solvation']: cmd.append(VACUUM_OPTION) if ao['membrane_equilibration']: cmd.append('-membrane') if ao.get('modify_dihe', False): cmd.append('-modify_dihe') custom_charge_mode = ao.get('custom_charge', 'assign') if custom_charge_mode != 'assign': cmd += ['-custom-charge-mode', ao['custom_charge']] cmd += ['-lambda_windows', ao[constants.ALIAS_FEP_NUM_LW_DEFAULT]] cmd += ['-core_hopping_lambda_windows', ao[constants.ALIAS_FEP_NUM_LW_CORE]] cmd += ['-charged_lambda_windows', ao[constants.ALIAS_FEP_NUM_LW_CHARGE]] opls_dir_setting = [] if opls_dir: opls_dir_setting = get_opls_dir_cmd(opls_dir) cmd += opls_dir_setting cmd += [ '-JOBNAME', jobname, struct_fname, ] cmd += opt generate_scripts(cd_params, jobname, cmd, opls_dir_setting, use_ffbuilder=use_ffbuilder) return cmd
[docs]def write_script(fname, cmd): """ Write the list of commands to a script file and make the file executable. :param fname: the filename of the script file :type fname: str :param cmd: the list of commands :type cmd: list of str """ with open(fname, 'w') as f: print(subprocess.list2cmdline(cmd), file=f) st = os.stat(fname) os.chmod(fname, st.st_mode | 0o111)
[docs]def generate_scripts(cd_params, jobname, cmd, opls_dir_setting, use_ffbuilder=False): """ Write the start, restart and extend scripts. :param cd_params: the configuration dialog parameters :type cd_params: dict :param jobname: the job name :type jobname: str :param cmd: the command list :type cmd: list of str :param opls_dir_setting: the opls_dir_setting :type opls_dir_setting: list of str :param use_ffbuilder: whether ffbuilder arguments should be used :type use_ffbuilder: bool """ # start script start_cmd = list(cmd) start_cmd[0] = "$SCHRODINGER/fep_plus" write_script("%s.sh" % jobname, start_cmd) vacuum_args = [VACUUM_OPTION] if VACUUM_OPTION in cmd else [] oplsdir_args = [] if opls_dir_setting or use_ffbuilder: oplsdir_args = ['-OPLSDIR', get_restart_opls_dir(jobname)] sh_cmd = [ "$SCHRODINGER/fep_plus", "-HOST", cd_params['host'], "-SUBHOST", cd_params['subjob_host'], '-JOBNAME', jobname, "-checkpoint", jobname + "-multisim_checkpoint" ] + oplsdir_args + vacuum_args # restart restart_cmd = sh_cmd + ["-RESTART"] write_script("restart_%s.sh" % jobname, restart_cmd) # extend extend_cmd = sh_cmd + ["-extend", jobname + ".edge", "-time", "5000.0"] write_script("extend_%s.sh" % jobname, extend_cmd)
[docs]def generate_ffbuilder_options(ffb_host, ffb_subjobs=0): """ Generates command for using FFBuilder in the FEP executable """ if ffb_subjobs: ffb_host += f':{str(ffb_subjobs)}' return [FFBUILDER_ARG, FFBUILDER_HOST_ARG, ffb_host]