Source code for schrodinger.application.desmond.util

"""
Utility functions and classes

Copyright Schrodinger, LLC. All rights reserved.
"""

import fnmatch
import glob
import gzip
import hashlib
import os
import random
import re
import shutil
import sys
from pathlib import Path
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple

from schrodinger.application.desmond import constants
from schrodinger.application.desmond.constants import FepLegTypes
from schrodinger.application.desmond.constants import SIMULATION_PROTOCOL
from schrodinger.application.desmond.constants import PROTOCOL_TO_POSTFIX
from schrodinger.job import util as jobutil
from schrodinger.structure import Structure
from schrodinger.structure import _StructureAtom

# pwd module is not available for Windows platform.
if (sys.platform == "win32"):
    pass
else:
    import pwd


[docs]class TestSuite(object): """ A small and tight class designed for unit tests. With this utility, unit test code can be written much faster than with the 'unittest' module. """
[docs] def __init__(self, title): """ Sets the title of the suite to 'title' and all internal counters to 0. """ self.__title = title self.__n_case = 0 self.__n_pass = 0 self.__n_fail = 0 self.__suite = []
def __del__(self): """ Finishes testing untested cases and prints a simple summary. """ if (0 < len(self.__suite)): self.run() print("\ntesting summary: %d cases in total, %d passed, %d failed" % (self.__n_case, self.__n_pass, self.__n_fail)) print("-------%s-------\n\n" % self.__title) def __lshift__(self, case): """ Adds a case into the suite. Each case must be a tuple of at least two elements: - 'case[0]' must be one of the following: - a boolean value: - False - means the test is failed. - True - means the test is passed. - or a callable object that can return a boolean value. The object will be called later on by this 'TestSuite' object (via calling to the 'run' method), and the returned value will be used to judge whether the test is passed on the same rule as above. If an exception is thrown out of the call to the object, the test will be considered failed. - 'case[1]' must be a string, representing the name of the case. If there are more than two elements, the remaining elements will be printed or called (if callable) if the test is failed. This helps diagnose the problem. """ self.__suite.append(case)
[docs] def run(self): """ Performs testing of all test cases accumulated so far and print information indicating a particular case is passed or failed. The user can explicitly call this function as many times as like. Already tested cases will not be tested again. This means that the user can do something like this:: tsuite = TestSuite( 'util' ) ... tsuite << case1 tsuite.run() # Testing of 'case1' will be performed tsuite.run() # Allowed, but meaningless since 'case1' will be tested again. and so this:: tsuite << case1 tsuite.run() tsuite << case1 tsuite.run() # Allowed. The 'tsuite' takes the 2nd 'case1' as a new case and performs the testing. The user usually does NOT need to call this function explicitly because when the 'TestSuite' object is about to be destructed, it will automatically check and test any untested cases. """ if (0 == self.__n_case): print("-------%s-------" % self.__title) for case in self.__suite: print("testing case '%s'..." % case[1], end=' ') if (callable(case)): try: result = case() except: result = False else: result = case[0] self.__n_case += 1 if (result): print("passed") self.__n_pass += 1 else: print("failed") self.__n_fail += 1 if (2 < len(case)): for item in case[2:]: if (callable(item)): item() else: print(item) self.__suite = []
[docs]class Counter(object): """ This class was originally designed for the convenience of gridding widgets. For example, instead of writing a code like:: my_label1.grid( row = 1, sticky = W ) my_label2.grid( row = 2, sticky = W ) my_label3.grid( row = 3, sticky = W ) we can avoid the hardcoding ('row = 1', etc., which is generally bad and hard to maintain) using this class. The improved code will look like:: row_index = Counter() my_label1.grid( row = row_index.val, sticky = W ) my_label2.grid( row = row_index.val, sticky = W ) my_label3.grid( row = row_index.val, sticky = W ) which is equivalent to the above code, but generally easier to write and modify. The trick is that the property 'val', when read, will return the current value of the internal counter and then increment the counter (not the returned value) by 1. If the user just wants to get the current value of the counter but not want to change it, s/he can do either one of the two: 1. Use the `'va_'` property, 2. Explicitly convert the object to 'int'. """
[docs] def __init__(self, val=0): """ Constructs the object. One can provide a value 'val' to initialize the internal variable. For example:: row_index = Counter( 2 ) will let the counter start from 2, instead of 0 (default value). """ self.__val = val
def __int__(self): """ Supports conversion to an integer. """ return self.__val def __cmp__(self, other): """ Supports comparisons with integers or objects convertible to integers. """ return self.__val - int(other)
[docs] def reset(self, val=0): """ Resets the counter to 'val'. """ self.__val = 0
def __get_val(self): self.__val += 1 return self.__val - 1 def __get_va_(self): return self.__val val = property( fget=__get_val, doc= "Readonly. When read, this returns the value of the current count and then increment the count by" " 1. The incrementation does not affect the returned value.") va_ = property( fget=__get_va_, doc= "Readonly. When read, this returns the value of the current count without changing the internal" " state whatsoever of the object.")
[docs]def remove_file(basename: str, prefix: List[str] = None, suffix: List[str] = None): """ Tries to delete files (or dirs) whose names are composed by the given `basename`, a list of prefixes (`prefix`), and a list of suffixes (`suffix`). No effects if a file (or dir) does not exist. """ if (None != prefix and None != suffix): for pre in prefix: for suf in suffix: fname = pre + basename + suf if (os.path.isfile(fname)): os.remove(fname) elif (os.path.isdir(fname)): shutil.rmtree(fname) elif (None != prefix): for pre in prefix: fname = pre + basename if (os.path.isfile(fname)): os.remove(fname) elif (os.path.isdir(fname)): shutil.rmtree(fname) elif (None != suffix): for suf in suffix: fname = basename + suf if (os.path.isfile(fname)): os.remove(fname) elif (os.path.isdir(fname)): shutil.rmtree(fname) else: fname = basename if (os.path.isfile(fname)): os.remove(fname) elif (os.path.isdir(fname)): shutil.rmtree(fname)
[docs]def write_n_ct(fname, struc): """ Writes a list of CTs to a file with the name as given by 'fname'. The CTs in the output file are in the same order as in the list. The list can contain None elements, which will be ignored. This function has no effect if the 'struc' is an empty list or contains only Nones. !!!DEPRECATED!!! Use `struc.write_structures` instead. """ work_struc = [e for e in struc if (e is not None)] if (work_struc != []): work_struc[0].write(fname, format="maestro") for ct in work_struc[1:]: ct.append(fname, format="maestro")
[docs]def chdir(dir_name): """ Changes the current directory to the one of the name 'dir_name'. If 'dir_name' is '..', then it will change to the parent directory, and this is is done in a portable way. """ if (dir_name == ".."): os.chdir(os.pardir) else: os.chdir(dir_name)
[docs]def parent_dir(dir_name, up=1): """ Returns the parent directory name. :param up: This should be a non-negative integer value indicating the parent along the path. Default value is 1, indicating the immediate parent. Value 2, for example, indicates the parent of the immediate parent directory. """ parent = dir_name for i in range(up): parent = os.path.dirname(parent) return parent
[docs]def relpath(xpath, refpath=None): """ Given two paths ('xpath' and 'refpath'), returns the relative path of 'xpath' with respect to 'refpath'. Both 'xpath' and 'refpath' can be relative or absolute paths, and 'refpath' defaults to the current directory if it is not provided. """ if (refpath is None): refpath = os.getcwd() a = [e for e in os.path.realpath(xpath).split(os.path.sep) if (e != '')] r = [e for e in os.path.realpath(refpath).split(os.path.sep) if (e != '')] i = 0 for s in a: try: if (s != r[i]): break except IndexError: break i += 1 p = [".."] * (len(r) - i) + a[i:] if (p == []): return "" return os.path.join(*p)
[docs]def is_subdir(xpath, refpath=None): """ Given two paths ('xpath' and 'refpath'), returns True if 'xpath' is a direct or indirect subdirectory of 'refpath'. Also returns True if 'xpath' and 'refpath' are the same. Both 'xpath' and 'refpath' can be relative or absolute path, and 'refpath' defaults to the current directory if it is not provided. """ rp = relpath(xpath, refpath) if (rp[:2] == ".."): return False return True
[docs]def append_comment(fname, comment): """ Appends a string 'comment' to a file 'fname'. A char '#' will be automatically added to the head of the string. :param comment: A string or a list of strings. If it is a list of strings, each string will be appended as a separate comment. """ if (not os.path.isfile(fname)): raise IOError("File not found: %s" % fname) fh = open(fname, "a") if (comment.__class__ == list): for c in comment: print("# " + re.sub(".*/utilities/multisim", "$SCHRODINGER/utilities/multisim", c), file=fh) else: print("# " + re.sub(".*/utilities/multisim", "$SCHRODINGER/utilities/multisim", comment), file=fh) fh.close()
[docs]def random_string( n, char_pool="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890" ): """ Returns a random string with 'n' chars. The 'n' chars will be taken from a pool of chars as given by 'char_pool'. """ rs = "" m = len(char_pool) for i in range(n): c = random.randint(1, m - 1) rs += char_pool[c] return rs
[docs]def getlogin(): """ Returns the login name if it can be found, or otherwise 'unidentified_user'. Note: 'os.getlogin' seems to have a bad dependency on the terminal in use, and as such exception can be thrown when the terminal does not provide the requested login information. This function is to solve this problem by trying three different ways to get the login name, and if all ways failed it will return a valid string that tells the login name can not be identified. """ login = None if sys.platform == 'win32': try: login = os.environ['USERNAME'] except KeyError: pass else: # Original implementation. Changed on request of DESMOND-2899 # try: # login = os.getlogin() # except OSError : # try : # login = pwd.getpwuid( os.getuid() )[0] # except (KeyError, NameError,) : # login = os.environ.get( "USER" ) # Hope this is sufficient in term of portability and robustness, but no guarantee. try: login = pwd.getpwuid(os.getuid())[0] except ( KeyError, NameError, ): try: login = os.getlogin() except OSError: login = os.environ.get("USER") if (login is None): login = "unidentified_user" return login
def _strip_bracket(s): """ """ br_open = None br_close = None has_dollar = False i = 0 mask = [1] * len(s) for c in s: if (c == '['): br_open = i br_close = None has_dollar = False elif (c == ']' and br_open is not None): br_close = i if (has_dollar): for j in range(br_open, br_close + 1): mask[j] = 0 else: mask[br_open] = 0 mask[br_close] = 0 br_open = None elif (c == '$'): has_dollar = True i += 1 ret = "" for c, m in zip(s, mask): if (m): ret += c return ret
[docs]def expand_macro(s, macro_dict): """ Replaces the macros in the string 's' using the values given by the macro dictionary 'macro_dict'. The expanded string will be returned. Macro conventions: - All macros should start with a single '$', followed by capital letters, e.g., "$JOBNAME", "$USERNAME". - Optional macros should be bracketed by '[]', e.g., "myjob[_lambda$LAMBDANO]", where "$LAMBDANO" is an optional macro. - If optional macros are not expanded, the bracketed part of the string will be discarded. - Macro values should not contain the following chars: '[', ']', and '$'. """ for m in macro_dict: s = s.replace(m, str(macro_dict[m]), -1) return _strip_bracket(s)
# DEPRECATED!!! Use `exit(msg)` instead.
[docs]def fatal(msg: str) -> None: raise SystemExit(msg)
# DEPRECATED!!! Use `verify_file_exists` instead.
[docs]def ensure_file_exists(fname: str): """ Ensure that the file exists and is not empty. :raise SystemExit: If the file is not found or is empty. """ if fname: verify_file_exists(fname, exit_on_error=True)
[docs]def verify_file_exists(fname_pattern: str, exit_on_error=False) -> str: """ Verifies that a single file/path matching `fname_pattern` actually exists and its size is not zero, and returns the actual file name. If the verification failed, IOW, no files found or multiple files found or the file is empty, raises an `IOError` (or `SystemExit' if `exit_on_error` is true). """ Exc = SystemExit if exit_on_error else IOError fnames = glob.glob(fname_pattern) if fnames: if len(fnames) == 1: fname0 = fnames[0] if os.path.getsize(fname0): return fname0 err_msg = [f"ERROR: File is empty: {fname0}"] else: err_msg = [ "ERROR: Multiple files found matching pattern " "`%s`: %s" % (fname_pattern, ", ".join(fnames)) ] else: err_msg = [f"ERROR: No files found matching pattern: {fname_pattern}"] # FIXME: When the file is not found, it's quite desirable to take a look at # the parent directory and know what files are present and their sizes. This # can be more complicated than listing the contents of the dir because the # path might be somehow mistaken along the way. raise Exc("\n".join(err_msg))
[docs]def verify_traj_exists(fname_pattern: str) -> str: """ Verifies that one and only one trajectory file (which may be a regular file or a directory, depending on the trajectory format) matching the given file name pattern `fname_pattern` actually exists, and returns the name of the trajectory file if it's found, or raises an `IOError` otherwise. `fname_pattern` follows the `glob` syntax. If the pattern doesn't contain a supported trajectory extension name, it will be treated as the pattern of the base name. """ from schrodinger.application.desmond.packages import traj_util fnames = traj_util.find_trajectories(fname_pattern) if len(fnames) > 1: err_msg = ("ERROR: Multiple trajectory files found matching pattern: " "'%s'\n %s" % (fname_pattern, "\n ".join(fnames))) elif len(fnames) < 1: err_msg = ("ERROR: No trajectory files found matching pattern: '%s'" % fname_pattern) else: return fnames[0] raise IOError(err_msg)
[docs]def time_duration(start_time, end_time, scale=1): """ Given the start time and the end time, returns a string that says the duration between the two time points in the format of 'xh y' z"', where 'x', 'y', and 'z' are hours, minutes, and seconds, respectively. :param start_time: A time in seconds since the Epoch (i.e. a value as returned by the 'time.time()' function). :param end_time: A time in seconds since the Epoch (i.e. a value as returned by the 'time.time()' function). """ duration = (end_time - start_time) * scale hour = int(duration / 3600.0) minute = int((duration - hour * 3600.0) / 60.0) second = int(duration - hour * 3600.0 - minute * 60.0) return "%sh %s' %s\"" % ( hour, minute, second, )
[docs]def get_product_info(product_name): """ Returns a tuple with the following elements: - 0 - the exec dir - 1 - the lib dir - 2 - the version number - 3 - the platform All elements are strings. """ product_name = product_name.lower() PRODUCT_NAME = product_name.upper() PRODUCT_EXEC = PRODUCT_NAME + "_EXEC" exec_dir_re = re.compile("(.*" + product_name + "-v)" + "([0-9.]*)/bin/(.*)") if (PRODUCT_EXEC in os.environ): exec_dir = os.environ[PRODUCT_EXEC] else: # Gets the "exec" and "lib" directories by "hunting". try: exec_dir = jobutil.hunt(product_name) except: raise Exception("Could not determine MMSHARE_EXEC.") lib_dir = exec_dir_re.sub(r"\1\2/lib/\3", exec_dir) ver = exec_dir_re.sub(r"\2", exec_dir) plat = exec_dir_re.sub(r"\3", exec_dir) return ( exec_dir, lib_dir, ver, plat, )
[docs]def html_embed_image(url): """ """ return "<img hspace=20 vspace=20 src=\"%s\"/>" % (url.replace("&", "&amp;"),)
[docs]def unique(seq: Iterable): """ Iterates over a given sequence `seq` in the same order. If there are duplicate elements, only the first occurence is preserved. For example:: [1, 2, 3, 3, 4, 3, 4, 0] ==> [1, 2, 3, 4, 0] This function requires that all elements in `seq` are hashable. """ added = set() add = added.add return (e for e in seq if not (e in added or add(e)))
# DEPRECATED!!! Use `traj_util.find_trajectories` instead.
[docs]def get_traj_filename(basename: str) -> Optional[str]: # FIXME: this is an ugly solution to get existing trajectory file/folder. # First, the code will check the existence of *_trj, then *.xtc # NOTE: What if both of them (_trj and .xtc) exist? # Ideally trajectory format should be passed from upstream. supported_formats = ['_trj', '.xtc'] # move elsewhere? for fm in supported_formats: fname = f"{basename}{fm}" if os.path.exists(fname): return fname return None
[docs]def parse_res(res): """ Return (chain, resnum, inscode) from residue name in the form <chain>:<resnum><inscode> Input should be in the form <chain>:<resnum><inscode> <chain> is the chain id or a _ or no character if space <resnum> is the residue number (possibly negative) <inscode> is the pdb insertion code Examples A:12 :23A B:-1 _:12 """ a = re.search(r"(.{0,1}):(-{0,1}\d+)(.{0,1})", res) if not a: raise ValueError( "Input should be in the form <chain>:<resnum><inscode>") chain = a.group(1) if chain == "_" or chain == "": chain = " " resnum = int(a.group(2)) inscode = a.group(3) return chain, resnum, inscode
[docs]def parse_edge_file(fname: str) -> List[Tuple[str, str]]: """ Rules: 1. An edge is identified by its ID, which is a string of the two node IDs separated by '_'. 2. Each node ID can be either a full ID or a short ID. 3. Each line in the .edge file should contain at most 1 edge ID. 4. Lines containing only white spaces are allowed, and they will be ignored by the parser. 5. A comment is a string that starts with '#' and ends with '\n', and it will be ignored by the parser. :return: A list of edge IDs from the parsed edge file. """ edges = [] with open(fname) as fh: lines = fh.readlines() for line in lines: line = line.strip() if line and not line.startswith("#"): e = line.split('#')[0] e = e.replace("-", "_") e = e.replace(":", "_") node_ids = tuple(nid.strip() for nid in e.split("_")) if not (len(node_ids) == 2 and all(re.match('([a-f]|[0-9]){7,40}', s) for s in node_ids)): fatal("ERROR: Edge file %s is NOT in the right format.\n" \ "An example for the format of an edge-file:\n" \ " 36da5ad:397128e\n" \ " 33dd5ad:347118e\n" \ " 33fe5ad:3171f8e\n" \ "Each line specifies an edge with the two node's IDs. Each node ID is a hex" \ " number of at least 7 digits. The two IDs are separated by a '_' (or ':' or" \ " '-')." % fname) edges.append(node_ids) return edges
[docs]def parse_ligand_file(fname: str) -> List[str]: r""" Parse a ligand file with the following format: 1. On each line, a ligand is identified by the hash id 2. Lines containing only white spaces are allowed, and they will be ignored by the parser. 3. Each line in the .ligand file should contain at most 1 ligand ID. 4. Lines containing only white spaces are allowed, and they will be ignored by the parser. 5. A comment is a string that starts with '#' and ends with '\n', and it will be ignored by the parser. :return: A list of structure hash ids. """ ligands = [] with open(fname) as fh: lines = fh.readlines() for line in lines: line = line.strip() if not line or line.startswith("#"): continue lig_id = line.split("#")[0].strip() if not lig_id or not re.match('([a-f]|[0-9]){7,40}', lig_id): raise ValueError( f"ERROR: Unable to parse .ligand file - {fname}\n" f"Line '{line}' invalid.\n" f"An example for the format of a ligand-file:\n" f" 36da5ad # ligand1\n" f" 347118e\n" f"Each line specifies a ligand ID with an optional comment " f"following a #\n") ligands.append(lig_id) return ligands
[docs]def write_ligand_file(fname: str, cts: List[Structure]) -> None: """ Given a list of structures, write a file containing ligand hash ids :param fname: Path for output. :param cts: List of structures. """ with open(fname, 'w') as f: for ct in cts: f.write(f'{str2hexid(ct.title)} # {ct.title}\n')
[docs]def str2hexid(s: str, full_id=False) -> str: """ Returns a unique hex code for the given string `s`. The chances of returning the same hex code for different values of `s` is low enough (though not zero in principle), the returned hex code can serve as an ID of the input string. By default, the returned hex code is 7 digit long, which is only the initial part of the full code in 40 digits. To get the latter, set the argument `full_id=True`. """ whole_hex_code = hashlib.sha1(s.encode('utf-8')).hexdigest() return whole_hex_code if full_id else whole_hex_code[:7]
def _key2flags(k: str) -> List[str]: """ Converts a key of a function's keyword argument into a command line flag. If `k` does NOT have a leading dash, a dash will be prepended. If `k` has underscores, two flags will be returned: The first has the underscores, the second has the dashes converted from the underscores. """ if k.startswith("DASHFLAG_"): return ['-' + k[9:].replace('_', '-')] elif k.startswith("UNDERSCOREFLAG_"): return ['-' + k[15:]] flags = [k, k.replace("_", "-")] if '_' in k else [k] return flags if k.startswith('-') else ['-' + e for e in flags]
[docs]def check_command(cmd: List, *args, **kwargs): """ Check the command line arguments against `args` for positional arguments and `kwargs` for keyword arguments. For flags like -multiword-flag, the corresponding keyword in `kwargs` is `multiword_flag` . This function by default does NOT distinguish a -multiword-flag from a -multiword_flag. Both forms are considered valid flags. If you want to force a dash flag, prefix the keyword with `"DASHFLAG_"` , e.g., "DASHFLAG_multiword_flag"; to force a underscore flag, prefix the keyword with `"UNDERSCOREFLAG_"` For user's convenience, one can use any types of values in `args` and `kwargs`. The value will be converted to `str` before checking if it exists in `cmd`. For example, you can use this key-value pair in `kwargs`: `maxjob=1`, which is equivalent to `maxjob='1'`. For keyword arguments that take multiple values, e.g., "-d stage_1-out.tgz -d stage_2-out.tgz", the values in `kwargs` should be specified in a list, e.g., `d=["stage_1-out.tgz", "stage_2-out.tgz"]`. For keyword arguments that take NO values, use `None` as the value in `kwargs`. :param cmd: Must satisfy the following requirements: 1. All argument flags should be single dash flags. 2. If an argument that takes a single value but specified multiple times in `cmd` the right-most specification is in effect. :raises AssertionError: If any arguments as specified by `args` and `kwargs` are NOT found in `cmd`. """ for arg in args: assert str(arg) in cmd, str(arg) reversed_cmd = cmd[-1::-1] for k, v in kwargs.items(): flags = _key2flags(k) if isinstance(v, list): # Multivalue argument. Note it's legitimate for `values` to have # duplicate elements, and so we cannot use `set` here. values = [cmd[i + 1] for i, flag in enumerate(cmd) if flag in flags] # Ensures all elements in `v` exist in `values`. for e in v: assert e in values, f'{e} not in {values}' # If we have duplicate elements in `v`, we have the same number # of duplicates in `values`. i = values.index(e) del values[i] assert not(values), \ "command has %d more values for flag: %s" % \ (len(values), flags[0]) else: # For duplicate arguments the right-most one is significant. for flag in flags: if flag in cmd: i = reversed_cmd.index(flag) if v is not None: assert i > 0 assert reversed_cmd[i - 1] == str( v), f'{reversed_cmd[i - 1]} != {str(v)}' break else: assert False, f"command has no flags: ({flags})"
def _commandify_segment(raw_cmd_seg: List) -> List[str]: """ Processes a sublist of the raw command. Examples:: ['-dew-asl', 'ligand'] ==> ['-dew-asl', 'ligand'] ['-n', 200] ==> ['-n', '200'] ['-fep-lambda', None] ==> [] ['-fep-lambda', 1] ==> ['-fep-lambda', '1'] ['-transpose-box', True] ==> ['-transpose-box'] ['-transpose-box', False] ==> [] ['-fep-lambda', None, ['-protein-fep', True]] ==> [] ['-fep-lambda', 1, ['-protein-fep', True]] ==> ['-fep-lambda', '1', '-protein-fep'] # -start-interval expects either a pair of values or a single value. ['-start-interval', [[0], interval]] ==> ['-start-interval', 0, interval] ['-start-interval', [[None], interval]] ==> ['-start-interval', interval] """ cmd_seg = [] for e in raw_cmd_seg: if e is None: return [] if isinstance(e, list): cmd_seg += _commandify_segment(e) else: cmd_seg.append(str(e)) if 2 == len(cmd_seg): if cmd_seg[1] == str(False): # This is a switch argument that should NOT be specified. return [] if cmd_seg[1] == str(True): # This is a switch argument that should be specified. return cmd_seg[:1] return cmd_seg
[docs]def commandify(raw_cmd: List) -> List[str]: """ A `subprocess` command is a list of strings. This is often not the most convenient data structure for composing the command. For example, if you have numbers in the command, you have to convert them into strings; if the value of an argument is `None`, instead of putting the string "None" into the command, you want to drop the argument altogether. This function is to make command composition a bit less boiler-plated, by providing some grammars: 1. A "raw command" is one that can be properly processed by this function to return a `subprocess` command. 2. A raw command is a list of arbitrary types of objects. 3. For positional arguments, they should be direct and string-convertible elements in the raw command. If an element is `None` it will be removed from the returned command. 4. A keyword argument should be specified as a `list` in the raw command. The first element of the list should be the flag, which again can be of any string-convertible type. The rest elements should be the values. If any of the values is `None`, this keyword argument will be removed from the returned command. 5. A switch argument (which has no values following the flag) is similar to the keyward argument, except that it should have one and only one value, of the boolean type. If the value is `True`, the flag will be added into the returned command; otherwise it will be removed from there. """ cmd = [] for e in raw_cmd: if e is None: continue if isinstance(e, list): cmd += _commandify_segment(e) else: cmd.append(str(e)) return cmd
[docs]def use_custom_oplsdir(st): """ Determines if the given structure was marked by the System Build panel(s) to indicate that the custom OPLSDIR in Maestro preferences should be used. :param st: structure whose properties are to be queried :type st: structure.Structure :return: whether to use the Maestro preference custom OPLSDIR :rtype: bool """ if st.property.get(constants.USE_CUSTOM_OPLSDIR): return True # Provide backwards compatibility support for original property; independent # of its value, treat its presence as equivalent to USE_CUSTOM_OPLSDIR=True return bool(st.property.get("s_ffio_custom_opls_dir"))
[docs]def gz_fname_if_exists(fname: str): if Path(fname + '.gz').exists(): return fname + '.gz' return fname
[docs]def copy_and_compress_files(src_dir: str, dest_dir: str, compress_pattern=None): """ Copy the files from `src_dir` to `dest_dir`, optionally compressing a subset of files. :param compress_pattern: Optional, files that match the pattern will be gzip compressed and renamed to have a .gz extension. """ def _copy(src, dest, **kwargs): if compress_pattern and fnmatch.fnmatch(src, compress_pattern): Path(str(dest) + '.gz').write_bytes( gzip.compress(Path(src).read_bytes(), compresslevel=1)) else: shutil.copy2(src, dest, **kwargs) shutil.copytree(src_dir, dest_dir, dirs_exist_ok=True, copy_function=_copy)
[docs]def get_leg_name_from_jobname(jobname: str) -> str: parts = jobname.split("_") if parts[-2] == FepLegTypes.SUBLIMATION or parts[ -2] == FepLegTypes.SOLVATION: # legname will be in form "sublimation_<idx>", e.g. "sublimation_1" return "_".join(parts[-2:]) elif jobname.endswith(FepLegTypes.FRAGMENT_HYDRATION): # legname is 'solvent_fragment_hydration' return FepLegTypes.FRAGMENT_HYDRATION elif jobname.endswith(FepLegTypes.RESTRAINED_FRAGMENT_HYDRATION): # legname is 'solvent_restrained_fragment_hydration' return FepLegTypes.RESTRAINED_FRAGMENT_HYDRATION return parts[-1]
[docs]def get_leg_type_from_jobname(jobname: str) -> str: legname = get_leg_name_from_jobname(jobname) if legname.startswith(FepLegTypes.SUBLIMATION): # leg_name == leg_type for most leg types, except for sublimation # sublimation is in form "sublimation_<idx>", e.g. "sublimation_1" return FepLegTypes.SUBLIMATION elif legname.startswith(FepLegTypes.SOLVATION): # For same reason as above return FepLegTypes.SOLVATION return legname
[docs]def get_msj_filename(jobname: str, leg: Optional[FepLegTypes] = None, protocol: Optional[SIMULATION_PROTOCOL] = None, extend: Optional[bool] = False) -> str: """Return the standardized .msj filename as a string.""" fname = jobname if protocol: fname += PROTOCOL_TO_POSTFIX[protocol] if leg: fname += f"_{leg}" if extend: fname += ".extend" return f"{fname}.msj"
[docs]def is_dummy_structure(st: Structure) -> bool: """ Return whether the structure is a dummy structure. `constants.DUMMY_LIGAND` is the current way to mark a structure but we also check for the deprecated `constants.ABFEP_DUMMY_LIGAND`. """ return st.property.get( constants.ABFEP_DUMMY_LIGAND) == 1 or st.property.get( constants.DUMMY_LIGAND) == 1
[docs]def make_structure_dummy(st: Structure) -> _StructureAtom: """ Mark structure as the dummy and add a dummy atom. This is needed for FEP simulations which use the Graph format but don't have traditional lambda 0 and lamdba 1 inputs (e.g. Absolute Binding, Solubility) """ st.title = "dummy" st.property[constants.DUMMY_LIGAND] = 1 coords = [0.0, 0.0, 0.0] new_atom = st.addAtom('Na', *coords) new_atom.formal_charge = 0 new_atom.pdbres = 'DU ' return new_atom
[docs]def predict_memory_utilization(fep_type: constants.FEP_TYPES, num_atoms: int, num_windows: int) -> Tuple[int, int]: """ Predict the cpu and gpu memory utilization in MB for an fep job. """ if fep_type in [constants.FEP_TYPES.COVALENT_LIGAND] + list( constants.SELECTIVITY_FEP_TYPES): cpu_mem = constants.COVALENT_CPU_SLOPE_INTERCEPT[ 0] * num_atoms * num_windows + constants.COVALENT_CPU_SLOPE_INTERCEPT[ 1] gpu_mem = constants.COVALENT_GPU_SLOPE_INTERCEPT[ 0] * num_atoms * num_windows + constants.COVALENT_GPU_SLOPE_INTERCEPT[ 1] else: cpu_mem = constants.CPU_SLOPE_INTERCEPT[ 0] * num_atoms * num_windows + constants.CPU_SLOPE_INTERCEPT[1] gpu_mem = constants.GPU_SLOPE_INTERCEPT[ 0] * num_atoms * num_windows + constants.GPU_SLOPE_INTERCEPT[1] return cpu_mem, gpu_mem