Source code for schrodinger.application.matsci.deswidgets

"""
Contains widgets that are useful in MatSci desmond panels.

Copyright Schrodinger, LLC. All rights reserved.
"""

import os
import shutil
from types import SimpleNamespace

import schrodinger
from schrodinger.application.desmond import cms
from schrodinger.application.matsci import appbase
from schrodinger.application.matsci import codeutils
from schrodinger.application.matsci import gutils
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import multiapp
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci.desmondutils import cms_writer
from schrodinger.application.matsci.desmondutils import \
    get_cms_and_trj_path_from_st
from schrodinger.infra import canvas2d
from schrodinger.models import parameters
from schrodinger.project import utils as pt_utils
from schrodinger.structutils import analyze
from schrodinger.tasks import tasks
from schrodinger.ui.qt import swidgets
from schrodinger.ui.qt.appframework2 import af2
from schrodinger.utils import fileutils

traj = codeutils.get_safe_package('desmond.traj')

maestro = schrodinger.get_maestro()


[docs]def get_row_from_proj_table(struct): """ Given an input structure, return the row from the project table :type st: `schrodinger.structure.Structure` :param st: structure to get associated row of from project table :rtype: `schrodinger.project.project.ProjectRow` :return: the row from project table """ entry_id = struct.property.get(msprops.ENTRY_ID_PROP) if entry_id is None: return None # Get Project Table prj_table = pt_utils.get_PT() if prj_table is None: return None # Get Row try: row = prj_table[entry_id] except KeyError: # Row not present return None return row
[docs]def get_traj_path_from_proj_table(struct): """ Get traj path for the passed structure in maestro from project table :param struct `schrodinger.structure.Structure`: Structure to get associated trajectory :rtype: str or None :return: Path to the associated trajectory for the structure. None if there is no associated cms. """ # Get row from project table row = get_row_from_proj_table(struct) if row is None: return None # Get trj from row trj_dir = row[msprops.TRAJECTORY_FILE_PROP] if trj_dir and not os.path.exists(trj_dir): # Attempt to find it in the original job (sub)directory rather than # whether Maestro thinks the trajectory is stored. trj_dir = jobutils.get_file_path(row, msprops.TRAJECTORY_FILE_PROP) return trj_dir
[docs]def get_cms_path_from_proj_table(struct): """ Get cms path for the passed structure in maestro from project table :param struct `schrodinger.structure.Structure`: Structure to get associated cms :rtype: str or None :return: Path to the associated cms file for the structure. None if there is no associated cms. """ # Get row from project table row = get_row_from_proj_table(struct) if row is None: return None # Get cms from row cms_file = row.cms_file if cms_file and not os.path.exists(cms_file): # Attempt to find it in the original job (sub)directory rather than # whether Maestro thinks it stored a copy of the cms file. cms_file = jobutils.get_file_path(row, msprops.ORIGINAL_CMS_PROP) return cms_file
[docs]def check_cms_trj_properties_are_found(cms_file, trj_dir, source_path): """ Check if the CMS and trj properties were found and raise ValueError if not :param source_path str: Path to source cms and trj folder :param cms_file str: Path to the associated CMS file :param trj_dir str: Path to the associated trajectory :return str,str: Path to the cms file trajectory frames if both are found. :raise ValueError: Raises error if the cms file or trajectory is not found """ # Check if cms and trj properties were found if cms_file is None: raise ValueError(f'The "{msprops.ORIGINAL_CMS_PROP}" property is not ' 'defined in the model system.') if trj_dir is None: raise ValueError( f'The "{msprops.TRAJECTORY_FILE_PROP}" property is not ' 'defined in the model system.') if not os.path.exists(cms_file): # Attempt to find cms path using source path cms_file = os.path.basename(cms_file) cms_file = os.path.join(source_path, cms_file) if not os.path.exists(cms_file): raise ValueError(f'The cms file {cms_file} does not exist.') # Check trajectory path if not os.path.exists(trj_dir): trj_dir = os.path.basename(trj_dir) trj_dir = os.path.join(source_path, trj_dir) if not os.path.exists(trj_dir): raise ValueError(f'The trajectory {trj_dir} does not exist.') # Try loading trajectory path try: traj.read_traj(trj_dir) except Exception as msg_error: msg = "Cannot load trajectory: %s" % msg_error raise ValueError(msg) return cms_file, trj_dir
[docs]def get_cms_and_trj_path_from_proj_table(struct): """ Get trajectory and cms path for the passed structure in maestro from project table :param struct `schrodinger.structure.Structure`: Structure to get associated cms and trajectory of :param source_path str: Path to source cms and trj folder :retype: tuple(str or None, str or None) :return: The tuple consists of two elements. The first element is the name of the cms file if it exists, else it will be None. The second element is the name of the trajectory path if it exists, else it will be None. """ cms_file = get_cms_path_from_proj_table(struct) trj_dir = get_traj_path_from_proj_table(struct) return cms_file, trj_dir
[docs]def get_cms_and_trj_path(struct, source_path=None): """ Get trajectory and cms path for the passed structure in maestro :param struct `schrodinger.structure.Structure`: Structure to get associated cms and trajectory of :param source_path str: Path to source cms and trj folder in case they do not exist in the working folder. Structure source path will be used in case it is not passed. :return str,str: Path to the cms file trajectory frames if both are found. :raises ValueError: In case the cms file or trajectory is not found """ # Set source path if not source_path: source_path = jobutils.get_source_path(struct) # Get CMS and trajectory path from project table cms_file, trj_dir = get_cms_and_trj_path_from_proj_table(struct) found_in_workspace = bool(cms_file) & bool(trj_dir) if not found_in_workspace: # Load cms and trajectory file using structure property cms_file, trj_dir = get_cms_and_trj_path_from_st(struct) # Check if cms and trj properties were found cms_file, trj_dir = check_cms_trj_properties_are_found( cms_file, trj_dir, source_path) return cms_file, trj_dir
[docs]class FrameSpinBox(swidgets.SSpinBox): """ A spin box to change and set the frames for the trajectory """
[docs] def __init__(self, layout, commands=None): """ Create a FrameSpinBox instance :type layout: QLayout :param layout: If supplied, the FrameSpinBox created will be added to this layout :type commands: list :param commands: The list of callbacks for the valueChanged signal. """ swidgets.SSpinBox.__init__(self, parent=None, layout=layout) self.setMinimumWidth(80) # Connecting signals to slots if commands: for command in commands: self.valueChanged.connect(command)
[docs]class TrajRangeSelectorDialog(swidgets.SDialog): """ Trajectory range selector dialog with fixed labels, a spinbox for step, and no overlapping of sliders """ DIALOG_SIZE = (490, 160)
[docs] def __init__(self, master): """ Create a TrajRangeSelectorDialog instance """ swidgets.SDialog.__init__(self, master, title='Trajectory Frames') self.setMaximumSize(*self.DIALOG_SIZE) self.setMinimumSize(*self.DIALOG_SIZE)
[docs] def layOut(self): """ Layout the widgets in the Dialog """ layout = self.mylayout # Horizontal layout to add trajectory range selection widget selection_layout = swidgets.SHBoxLayout(layout=layout) # Slider to adjust the start and end limit of frames self.slider = canvas2d.ChmDoubleSlider() # SpinBox to set the start limit of frames self.start_sb = FrameSpinBox( layout=selection_layout, commands=[self.slider.leftPos, self.changeLimitsEnd]) selection_layout.addWidget(self.slider) self.slider.setRange(0.0, 100.0) # SpinBox to set the end limit of frames self.end_sb = FrameSpinBox( layout=selection_layout, commands=[self.slider.rightPos, self.changeLimitsStart]) self.step_sb = swidgets.SLabeledSpinBox('Step size:', minimum=1, value=1, tip=parserutils.TRJ_STEP_HELP, layout=selection_layout, command=self.updateRangeLabel) self.label = swidgets.SLabel('', layout=layout) layout.addSpacing(25) self.traj_range = SimpleNamespace(initial_min=0, initial_max=100.0, temp_min=0, temp_max=100.0, step_size=0.1) # Connecting signals to the slots self.slider.valuesChanged.connect(self.slidersMoved)
[docs] def updateRangeLabel(self): """ Function to change label when spinbox values are changed or slider is moved """ minT = self.start_sb.value() * self.traj_range.step_size maxT = self.end_sb.value() * self.traj_range.step_size self.time_range = "%.2f - %.2f ns" % (minT, maxT) self.label.setText(self.time_range) frames = list(map(str, self.getFrames())) if len(frames) > 6: frames = frames[:3] + ['... '] + frames[-3:] interval = self.getStep() * self.traj_range.step_size text = '\nFrames: ' + ', '.join(frames) text += f'\n\nTime: {self.time_range} at every {interval:.2f} ns' self.label.setText(text)
[docs] def slidersMoved(self): """ Sets values in spinboxes and label when the sliders are moved """ start_pos = round(self.slider.leftPos()) end_pos = round(self.slider.rightPos()) self.start_sb.setRange(self.traj_range.initial_min, end_pos) self.start_sb.setValue(start_pos) self.end_sb.setRange(start_pos, self.traj_range.initial_max) self.end_sb.setValue(end_pos) self.updateRangeLabel()
[docs] def reset(self): """ Reset the widgets """ self.step_sb.reset() self.step_val = 1
[docs] def getStep(self): """ Get the trajectory step :return int: The step """ return self.step_sb.value()
[docs] def getFrames(self): """ Get the frame numbers based on the range and step :return list: list of frames numbers """ start_val = self.start_sb.value() end_val = self.end_sb.value() step = self.getStep() frames = list(range(start_val, end_val, step)) + [end_val] return frames
[docs] def accept(self): """ Save the step value in addition to what parent does """ self.setTempTrajRange() self.step_val = self.getStep() super().accept()
[docs] def reject(self): """ Restore step value in addition to what parent does """ self.setTempRangeToSlider(self.traj_range) self.step_sb.setValue(self.step_val) super().reject()
[docs] def setTrajLimits(self, min_f, max_f, step_size): """ Set the limits for trajectory range :type min_f: float :param min_f: minimum value of the trajectory frames :type max_f: float :param max_f: maximum value of the trajectory frames :type step_size: float :param step_size: step size of the frames """ self.step_sb.setMaximum(max_f) self.slider.setRange(min_f, max_f) self.traj_range = SimpleNamespace(initial_min=min_f, initial_max=max_f, temp_min=min_f, temp_max=max_f, step_size=step_size) self.slidersMoved() self.update()
[docs] def setTempRangeToSlider(self, traj_range): """ Set temporary trajectory range values to the range selection widgets :type traj_range: SimpleNamespace :param traj_range: set of values for the minimum and maximum number of frames and the step size. """ self.slider.leftPos(traj_range.temp_min) self.slider.rightPos(traj_range.temp_max) self.slidersMoved()
[docs] def setTempTrajRange(self): """ Read values from widgets and store the temporary range value in traj_range property """ self.traj_range.temp_min = self.start_sb.value() self.traj_range.temp_max = self.end_sb.value()
[docs] def changeLimitsEnd(self): """ Change limits of end time spin box to reflect the left slider position """ lpos = self.slider.leftPos() self.end_sb.setRange(lpos, self.traj_range.initial_max) self.updateRangeLabel()
[docs] def changeLimitsStart(self): """ Change limits of start time spin box to reflect the right slider position """ rpos = self.slider.rightPos() self.start_sb.setRange(self.traj_range.initial_min, rpos) self.updateRangeLabel()
[docs] def getRangeLabel(self): """ This function returns text string showing trajectory range in ns. :return: range string :rtype: str """ return self.label.text()
[docs]class TrajRangeSelectorFrame(swidgets.SFrame): """ Frame that adds trajectory selection button and label. It is connected to a dialog to select the trajectory range """
[docs] def __init__(self, layout): """ :param `QLayout` layout: layout to add the button to """ super().__init__(layout=layout) trj_layout = swidgets.SHBoxLayout(layout=self.mylayout) self.traj_btn = swidgets.SPushButton( 'Trajectory Frames...', layout=trj_layout, command=self.showTrajRangeSelectorDialog) self.traj_label = swidgets.SLabel('', layout=trj_layout) trj_layout.addStretch() self.traj_dialog = TrajRangeSelectorDialog(self) self.traj_dialog.dbb.accepted.connect(self.updateTrajectoryRangeLabel) self.trj_min_key = jobutils.get_string_from_flag( parserutils.FLAG_TRJ_MIN) self.trj_max_key = jobutils.get_string_from_flag( parserutils.FLAG_TRJ_MAX) self.trj_step_key = jobutils.get_string_from_flag( parserutils.FLAG_TRJ_STEP)
[docs] def updateTrajectoryRangeLabel(self): """ Update the trajectory range label with new range from traj dialog and correct the text """ time_range = self.traj_dialog.time_range num_frames = len(self.traj_dialog.getFrames()) self.traj_label.setText(f'{num_frames} frames, {time_range}')
[docs] def showTrajRangeSelectorDialog(self): """ Show traj dialog """ self.traj_dialog.exec()
[docs] def updateTrj(self, trj_path): """ Load new trajectory range :param str trj_path: path to the trajectory. :return bool: True if trajectory range was loaded, else False """ try: trj = traj.read_traj(trj_path) except Exception: return False nframes = len(trj) time_sec = [fr.time / 1000.0 for fr in trj] self.traj_dialog.setTrajLimits(0, nframes - 1, time_sec[1] - time_sec[0]) self.updateTrajectoryRangeLabel() return True
[docs] def getFlags(self): """ Get flags for current selected trajectory range """ flags = [] traj_frame_data = self.getInputs() flags += [ '-' + self.trj_min_key, str(traj_frame_data[self.trj_min_key]) ] flags += [ '-' + self.trj_max_key, str(traj_frame_data[self.trj_max_key]) ] # Only add step if non-default if traj_frame_data[self.trj_step_key] != 1: flags += [ '-' + self.trj_step_key, str(traj_frame_data[self.trj_step_key]) ] return flags
[docs] def getRange(self): """ Get current range of frames selected :rtype: tuple(int) :return: lower limit and upper limit of the selected range of frames """ start_val = self.traj_dialog.start_sb.value() end_val = self.traj_dialog.end_sb.value() return start_val, end_val
[docs] def setRange(self, start_val=None, end_val=None): """ Set the frame range (in number and not time) for trajectory selection :type start_val: int :param start_val: lower limit for frame in trajectory selection :type end_val: int :param end_val: upper limit for frame in trajectory selection """ # Set lower limit if start_val is not None: self.traj_dialog.start_sb.setValue(start_val) # Set upper limit if end_val is not None: self.traj_dialog.end_sb.setValue(end_val) # Update the dialog self.traj_dialog.setTempTrajRange() self.updateTrajectoryRangeLabel()
[docs] def setEnabled(self, state): """ Enable or disable the button and label for showing the traj dialog :param bool state: True to enable the button, and False to disable """ self.traj_btn.setEnabled(state) self.traj_label.setEnabled(state)
[docs] def reset(self): """ Reset the frame """ self.traj_dialog.reset() self.traj_label.setText('')
[docs] def getInputs(self): """ Get the minimum, maximum and step value of trajectory frames :return: min,max and step value of trajectory frames :rtype: dict """ return { self.trj_min_key: self.traj_dialog.start_sb.value(), self.trj_max_key: self.traj_dialog.end_sb.value(), self.trj_step_key: self.traj_dialog.getStep() }
[docs]class TrajAnalysisGuiMixin(appbase.BaseAnalysisGui): """ Class for extension of af2 to add widgets to gui for desmond trajectory analysis. """
[docs] def addLoadTrajButton(self, layout, setup_method=None, allow_gcmc=True): """ Load button to load structure from workspace and associated trajectory :param `QLayout` layout: layout to add the button and range selector to :param callable setup_method: The method to call to setup the panel :param bool allow_gcmc: Whether gcmc cms's should be allowed """ self.load_btn = appbase.StructureLoadButton(command=self._importTrjFile, layout=layout) self.trj_range_selector = TrajRangeSelectorFrame(layout=layout) self.setup_method = setup_method self.allow_gcmc = allow_gcmc
def _importTrjFile(self): """ Load the workspace file and get the trajectory path and cms file from the structure. :return `schrodinger.structure.Structure`: Structure that was imported """ self.resetPanel() struct = self.getIncludedEntry() if struct is None: return try: cms_file, trj_dir = get_cms_and_trj_path(struct) except ValueError as err: self.error(str(err)) return if not self.allow_gcmc: cms_model = cms.Cms(cms_file) if cms_model.is_for_gcmc: self.error( 'The input Desmond model is for Grand-Canonical Monte ' 'Carlo, which is not supported by this feature.') return result = self.trj_range_selector.updateTrj(trj_dir) if not result: return self.cms_file = cms_file self.trj_path = trj_dir self.struct = struct # Do any extra setup steps if necessary if self.setup_method: if self.setup_method() is False: # Setup failed. Reset the panel to clear the changes. self.resetPanel() return self.toggleStateMain(True) return struct
[docs] def resetPanel(self): """ Reset the panel variables and widgets set by this mixin :raise NotImplementedError: Will raise error if load button is not added """ try: self.trj_range_selector except AttributeError: raise NotImplementedError('addLoadTrajButton not called.') super().resetPanel() self.cms_file = None self.trj_path = None self.struct = None self.trj_range_selector.reset()
[docs] def toggleStateMain(self, state): """ Enable and disable trajectory traj button and label :param bool state: True to enable the button, and False to disable """ self.trj_range_selector.setEnabled(state)
[docs] def getTrajFlags(self): """ Get the command line flags for cms file, trajectory path, and trajectory range. :return list: A list of command line flags and values """ flags = [] jobname = self.jobname() # Get cms extension cms_ext = fileutils.get_file_extension(self.cms_file) cms_name = jobname + cms_ext shutil.copy2(self.cms_file, cms_name) # Get trj extension trj_ext = traj.get_trajectory_extname(ref_fname=self.trj_path) trj_name = jobname + trj_ext copy_func = (shutil.copytree if os.path.isdir(self.trj_path) else shutil.copy2) copy_func(self.trj_path, trj_name) # Associate trajectory to cms and write it with cms_writer(cms_name) as model: model.set_cts_property(msprops.TRAJECTORY_FILE_PROP, trj_name) flags += [parserutils.FLAG_CMS_PATH, cms_name] flags += [parserutils.FLAG_TRJ_PATH, trj_name] flags += self.trj_range_selector.getFlags() return flags
@af2.validator(-1000) def validateCMS(self): """ Check if valid structure has been loaded. :rtype: bool or (bool, str) :return: The bool is True if file is loaded. False with message pop up if file is not loaded. """ return self._validateCMS() def _validateCMS(self): """ Check if valid structure has been loaded. This is a separate method as jobtasks.preprocessor does not work with a function that returns an af2 validation object. Also it can be used independently of any decorator. :rtype: bool or (bool, str) :return: The bool is True if file is loaded. False with message pop up if file is not loaded. """ if not hasattr(self, 'cms_file') or self.cms_file is None: return (False, 'No structure loaded.') return True
[docs]class TrajectoryCompoundParam(parameters.CompoundParam): """ Class to link the inputs from the panel to the model.task.input object and define input parameters :type cms_file: tasks.Taskfile :param cms_file: The cms file on which trajectory density analysis calculations will be performed :type trj: tasks.TaskFile :param trj: The associated trajectory of the input cms file :type trj_max: int :param trj_max: Maximum value of the trajectory frames :type trj_min: int :param trj_min: Minimum value of the trajectory frames :type trj_step: int or None :param trj_step: Step size of the frames if the frames need to be skipped. None if nothing is skipped """ cms_file: tasks.TaskFile trj: tasks.TaskFile trj_max: int trj_min: int trj_step: int
[docs] def setAttributes(self, inputs): """ Set the attributes of the instance of model.task.input :type inputs: dict :param inputs: inputs from all the stages in the panel """ for key in inputs: setattr(self, key, inputs[key])
[docs]class SubstrateRestraintGroupBox(swidgets.SGroupBox): """ Group box used to define which atoms in a panel's MD system are substrates, at which point Desmond force restraints can be added to those substrate atoms. """
[docs] def __init__(self, panel, parent_layout, *, checkable=False): """ Initialize a group box tailored for defining and restraining substrate atoms. :param MultiDesmondJobApp panel: The Maestro panel that this widget should be associated with :param QBoxLayout parent_layout: The layout to place this SGroupBox into :param bool checkable: Whether the groupbox is checkable, False by default. """ if not isinstance(panel, multiapp.MultiDesmondJobApp): msg = (f'The parent panel for {self.__class__.__name__} must be ' 'a `MultiDesmondJobApp` object.') raise TypeError(msg) super().__init__('Substrate positional restraints', parent_layout=parent_layout, checkable=checkable) self.panel = panel self._layOutASL() self._layOutForceConstant()
def _layOutASL(self): """ Make a layout containing the widgets for the ASL string """ tip = 'The ASL used to identify the substrate atoms' self.asl_layout = swidgets.SHBoxLayout(layout=self.layout, add_stretch=False) self.asl_label = swidgets.SLabel('Use ASL:', layout=self.asl_layout, tip=tip) self.asl_select_btn = swidgets.SPushButton('Select...', command=self.setASL, layout=self.asl_layout, tip=tip) self.asl_le = swidgets.SLineEdit(layout=self.asl_layout, tip=tip) self.asl_clear_btn = swidgets.SPushButton('Clear', command=self.asl_le.reset, layout=self.asl_layout) def _layOutForceConstant(self): """ Make a layout containing widgets for force constants """ units = 'kcal/mol/%s%s' % (swidgets.ANGSTROM, swidgets.SUPER_SQUARED) tip = 'Force constant for the positional restraints of the substrate' self.force_sb = swidgets.SLabeledDoubleSpinBox('Force constant:', after_label=units, layout=self.layout, value=500.0, minimum=1.0, maximum=100000., tip=tip)
[docs] def error(self, msg): """ Raises an error in the parent panel """ self.panel.error(msg)
[docs] def getASL(self): """ Gets the currently-assigned ASL string :rtype: str :return: the ASL string currently entered into the widget """ return self.asl_le.text()
[docs] def getForceConstant(self): """ Gets the currently-assigned force constant :rtype: float :return: the force constant currently entered into the widget, in units of kcal/mol/Ang^2 """ return self.force_sb.value()
[docs] def setASL(self): """ Show ASL window and assign the output to the line edit. """ try: maestro.get_included_entry() except RuntimeError: self.error('There must be one and only one entry included in ' 'the Workspace') return if not self.panel.validateModelLoaded().passed: self.error('Included entry does not appear to be a valid ' 'Desmond system.') return new_asl = maestro.atom_selection_dialog('Set ASL', self.asl_le.text(), resolve_asl_aliases=True) self.asl_le.setText(new_asl) self.checkASL()
[docs] def checkASL(self): """ Check/prevent ASLs that contain entry.id or entry.name """ asl = self.getASL() bad_flags = [ flag for flag in gutils.DISALLOWED_ASL_FLAGS if flag in asl ] if bad_flags: self.error(f'"{bad_flags}" flags in ASL are not supported.') self.asl_le.setText('')
[docs] def setForceConstant(self, force_constant): """ Sets the force constant to a given value :param float force_constant: the force constant you want to set in the widget, in units of kcal/mol/Ang^2 """ self.force_sb.setValue(force_constant)
[docs] def getCommandLineFlags(self): """ Returns the command line flags associated with this widget. :rtype: list(str) :return: Command-line flags and their values. If no ASL is set, then returns an empty list. """ cmd = [] if self.isCheckable() and not self.isChecked(): return cmd restraints_asl = self.asl_le.text().strip() if restraints_asl: cmd += [parserutils.FLAG_SUBSTRATE_ASL, restraints_asl] cmd += [ parserutils.FLAG_SUBSTRATE_FORCE_C, str(self.force_sb.value()) ] return cmd
@af2.validator() def validateASL(self): """ Check that the ASL is valid. :rtype: bool or bool, msg :return: True if everything is OK, (True, msg) if the user should be asked if "msg" is OK to continue. """ asl = self.getASL() if not self.isCheckable() and not asl: return True if self.isCheckable() and not self.isChecked(): return True if not analyze.validate_asl(asl): return False, f'Invalid ASL provided: "{asl}"' if not analyze.evaluate_asl(self.panel._models[0], asl): return False, f'No atoms matched the ASL: "{asl}"' return True
[docs] def reset(self): """ Reset the widget """ self.asl_le.reset() self.force_sb.reset()