Source code for schrodinger.utils.fileutils

"""
A module of file utilities to deal with common file issues.

NOTE: This module is used in scripts that need to be able to run without
a Schrodinger license, and therefore can't depend on the pymmlibs.

The force_remove and force_rename functions deal with the fact that
os.remove() and os.rename() don't work on Windows if write permissions are
not enabled.

Copyright Schrodinger LLC, All Rights Reserved.
"""

import backoff
import csv
import ctypes
import errno
import glob
import gzip
import hashlib
import io
import itertools
import ntpath
import os
import re
import shutil
import stat
import sys
import tarfile
import zipfile
import tempfile
import unicodedata
import warnings
from contextlib import contextmanager
from enum import Enum
from pathlib import Path
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from schrodinger.infra import mm
from schrodinger.utils import csv_unicode
from schrodinger.utils import subprocess

XYZ_EXT = '.xyz'
SDF_EXT = '.sdf'

FORCE_REMOVE_BACKOFF_INTERVAL = 0.2
FORCE_REMOVE_BACKOFF_MAX_TIME = 1

# winerror module used to get ERROR_SHARING_VIOLATION code for _force_remove
# _winreg module is required to query registry for PyMOL installation.
if sys.platform == 'win32':
    try:
        import winerror
        import winreg
    except ImportError:
        winerror = None
        winreg = None

# Constants for use with get_directory() or get_directory_path()
(HOME, APPDATA, LOCAL_APPDATA, USERDATA, TEMP, DESKTOP, DOCUMENTS,
 NETWORK) = (mm.DirectoryName_MMFILE_HOME, mm.DirectoryName_MMFILE_APPDATA,
             mm.DirectoryName_MMFILE_LOCAL_APPDATA,
             mm.DirectoryName_MMFILE_USERDATA, mm.DirectoryName_MMFILE_TEMP,
             mm.DirectoryName_MMFILE_DESKTOP, mm.DirectoryName_MMFILE_DOCUMENTS,
             mm.DirectoryName_MMFILE_NETWORK)

fsenc = sys.getfilesystemencoding()

SCHRODINGER_ENVVAR_STR = \
    "%SCHRODINGER%" if sys.platform == 'win32' else "$SCHRODINGER"

SCHRODINGER_RUN_STR = os.path.join(SCHRODINGER_ENVVAR_STR, 'run')

# Used in appframework.py, af2.py, config_dialog.py, and multiapp.py:
INVALID_JOBNAME_ERR = (
    'Invalid job name: "%s"\nJob name may not be blank or '
    'contain spaces, special symbols, leading hyphens, or leading periods.')

# constants to extend windows filepath if it surpasses max length
WINDOWS_EXTENDED_PATH_TAG = '\\\\?\\'
WINDOWS_MAX_PATH = 260

#===============================================================================
# File deletion and renaming
#===============================================================================


[docs]class SharingViolationError(PermissionError): pass
@backoff.on_exception( backoff.constant, SharingViolationError, interval=lambda: FORCE_REMOVE_BACKOFF_INTERVAL, max_time=lambda: FORCE_REMOVE_BACKOFF_MAX_TIME, ) def _force_remove(filename): """ Attempt to remove a single file. Meant to be called by `force_remove`. :param filename: The file path. :return: NoneType """ try: os.remove(filename) except OSError as e: if e.errno == errno.ENOENT: pass elif e.errno == errno.EACCES: if winerror and e.winerror == winerror.ERROR_SHARING_VIOLATION: raise SharingViolationError(e) from None os.chmod(filename, stat.S_IREAD | stat.S_IWRITE) os.remove(filename) else: raise
[docs]def force_remove(*args): """ Remove each file in 'args' in a platform independent way without an exception, regardless of presence of the file or the lack of write permission. :param args: the pathname for the files to remove :type args: str """ filenames = args for filename in filenames: _force_remove(filename)
[docs]def force_rmtree(dirname: Union[str, Path], ignore_errors: bool = False): """ Remove the directory 'dirname', using force_remove to remove any difficult to remove files or sub-directories. :param dirname: the directory to remove :param ignore_errors: If True, silently ignore errors, otherwise raise OSError """ if not os.path.exists(dirname): return def remove_helper(func, path, excinfo): """ Helper function for passing into rmtree that calls force_remove """ if func in (os.remove, os.unlink): force_remove(path) elif func == os.rmdir: os.rmdir(path) else: raise shutil.rmtree(dirname, ignore_errors=ignore_errors, onerror=remove_helper)
[docs]def force_rename(old: Union[Path, str], new: Union[Path, str]): """ Rename a file, even if a file at the new name exists, and even if that file doesn't have write permission, and even if old and new are on different devices. :param old: Path to the file source. :param new: Path to the file destination. :note: Renaming may not be an atomic operation. If the 'new' file exists then it is first removed then renamed in two operations. Similarly, if old and new are not on the same device then the file is copied to 'new' then the 'old' file is removed. """ try: os.rename(old, new) except OSError as e: if e.errno == errno.EEXIST: force_remove(new) os.rename(old, new) elif e.errno == errno.EXDEV: # ev115508 # os.rename can't rename across filesystem devices. Catch the # error and use shutil.move to copy to 'new' then remove 'old'. shutil.move(old, new) else: raise
[docs]def force_copy2(*args): """ Same as shutil.copy2 but don't raise shutil.SameFileError. """ try: shutil.copy2(*args) except shutil.SameFileError: pass
#=============================================================================== # Filename parsing #===============================================================================
[docs]def splitext(p: str) -> Tuple[str, str]: r""" Split the extension from a pathname. Returns "(root, ext)". Equivalent to os.path.splitext(), except that for gzip compressed files, such as \*.mae.gz files, ".mae.gz" is split off instead of ".gz". \*.sdf.gz, \*.sd.gz, \*.mol.gz :param p: a pathname :return: The root filename and the file extension. """ pl = p.lower() special_cases = ('-comdef.tar.gz', '.mae.gz', '.cms.gz', '.sdf.gz', '.mol.gz', '.tar.gz', '.smi.gz', '.pdb.gz', '.ent.gz', '.sd.gz', '.cif.gz', XYZ_EXT, '.csv.gz') for ext in special_cases: if pl.endswith(ext): split_pos = -len(ext) return p[:split_pos], p[split_pos:] return os.path.splitext(p)
PDB, MOL2, SD, MAESTRO = "pdb", "mol2", "sd", "maestro" CLUSTAL = "clustal" SMILES, SMILESCSV, CIF, PHASE_HYPO = "smiles", "smilescsv", "cif", "phasehypo" CMS, MAESTRO_STRICT = "cms", "maestro_strict" SeqFormat = Enum("SeqFormat", ["fasta", "swissprot", "gcg", "embl", "pir", "clustal", "csv"]) PFX = "pfx" XYZ = 'xyz' # Used for specifying the .mae, .maegz, and .mae.gz extensions to the exclusion # of other "MAESTRO" extensions in the EXTENSIONS dict below. _MAE_EXTS = ['.mae', '.mae.gz', '.maegz'] _CMS_EXTS = ['.cms', '.cms.gz', '.cmsgz'] # There is a test, testGoStructureExtensions() in fileutils_test.py, # which asserts that all these extensions are known to postmortem.go. # If an extension is added here, it needs to be added to the structureSuffixes map # in postmortem, or the test in question will fail. EXTENSIONS = { PDB: [ '.pdb', '.ent', # ev90221: Add support for compressed pdb (.pdb.gz, .pdbgz, .ent.gz, .entgz). '.pdb.gz', '.pdbgz', '.ent.gz', '.entgz', ], MOL2: ['.mol2'], SD: [ '.sd', '.sdf', '.mol', # ev74524: Add support for compressed sd. '.sdf.gz', '.sdfgz', '.sd.gz', '.mol.gz' ], MAESTRO: ['.bld'] + _CMS_EXTS + _MAE_EXTS, # m2io supported files MAESTRO_STRICT: _MAE_EXTS, # Used for browsing for Maestro files in GUIs CMS: _CMS_EXTS, # Used for browsing for Desmond CMS files in GUIs SMILES: ['.smi', '.smi.gz', '.smigz'], SMILESCSV: ['.csv', '.csv.gz', '.csvgz'], CIF: ['.cif', '.mmcif', '.cif.gz'], PHASE_HYPO: ['.phypo'], SeqFormat.fasta: ['.fasta', '.fst', '.fas', '.seq', '.fa'], SeqFormat.swissprot: ['.sw', '.sp', '.swiss', '.swissprot'], SeqFormat.gcg: ['.gcg', '.msf'], SeqFormat.embl: ['.embl', '.emb'], SeqFormat.pir: ['.pir'], SeqFormat.clustal: ['.aln'], SeqFormat.csv: ['.csv'], PFX: ['.pfx'], 'ALL': [''] }
[docs]def get_file_extension(filename): """ Return the file extension of the given file, including any suffixes prior to ".gz" extension. For example:: assert get_file_extension('myfile.txt') == '.txt' assert get_file_extension('test.mae.gz') == '.mae.gz' :param filename: File name to detect the format :type: str :return: format of the file. :rtype: str """ basename, format = splitext(filename) return format
[docs]def get_file_format(filename): msg = 'This function is deprecated. Use get_file_extension() instead.' warnings.warn(msg, DeprecationWarning, stacklevel=3) return get_file_extension(filename)
[docs]def get_structure_file_format(filename: str) -> Optional[str]: """ Return the format of a structure file, based on the filename extension. None is returned if the file extension is not recognized. :param filename: Filename to detect format :returns: File format or None if not recognized """ # Lazy import for clean build to avoid swig built after module installation from schrodinger.infra import structure as infra infra_structure_formats = { infra.FileFormat.PDB: PDB, infra.FileFormat.MOL2: MOL2, infra.FileFormat.SD: SD, infra.FileFormat.MAESTRO: MAESTRO, infra.FileFormat.CIF: CIF, infra.FileFormat.XYZ: XYZ, } # will throw an exception for .smi or .csv files infra_format = None try: infra_format = infra.get_format_from_extension(filename) except: pass # infra.get_format_from_extension does not recognize # SMILES or SMILES_CSV basename, ext = splitext(filename.lower()) if ext == '.gz': basename, base_ext = splitext(basename) ext = base_ext + ext if infra_format is None: for python_format in [SMILES, SMILESCSV]: if ext in EXTENSIONS[python_format]: return python_format return None # .phypo extension is FileFormat_MAESTRO in C++, but PHASE_HYPO in python if ext == ".phypo": return PHASE_HYPO return infra_structure_formats[infra_format]
[docs]def get_sequence_file_format(filename: str) -> Optional[str]: """ Return the format of a sequence file, based on the filename extension. None is returned if the file extension is not recognized. :param filename: Filename to detect format :return: File format or None if not recognized """ basename, ext = splitext(filename.lower()) for format in SeqFormat: if ext in EXTENSIONS[format]: return format return None
[docs]def get_name_filter(name_mapping: Dict[str, List[str]]) -> List[str]: """ Create filename filters for QFileDialog :param name_mapping: Mapping between category name and list of file types (must be keys of `EXTENSIONS`) :returns: List of filename filters """ filters = [] for name, categories in name_mapping.items(): exts = (f"*{e}" for e in itertools.chain.from_iterable( EXTENSIONS[ftype] for ftype in categories)) new_filter = "{name} ({exts})".format(name=name, exts=" ".join(exts)) filters.append(new_filter) return filters
[docs]def is_pdb_file(filename: str) -> bool: """ Returns whether the specified filename represents a PDB file. :param filename: a filename :return: Whether the file is a pdb file. """ return get_structure_file_format(filename) == PDB
[docs]def is_maestro_file(filename: str) -> bool: """ Returns True if specified filename represents a Maestro file. :param filename: a filename :return: Is this filename a maestro file? """ return get_structure_file_format(filename) == MAESTRO
[docs]def is_sd_file(filename: str) -> bool: """ Returns True if specified filename represents a SD file. :param filename: a filename :return: Is this filename an SD file? """ return get_structure_file_format(filename) == SD
[docs]def is_csv_file(filename: str) -> bool: """ Returns True if specified filename represents a CSV file. :param filename: a filename :return: Is this filename a csv file? """ return get_structure_file_format(filename) == SMILESCSV
[docs]def is_smiles_file(filename: str) -> bool: """ Returns True if specified filename represents a Smiles file. :param filename: a filename :return: Is this filename a smiles file? """ return get_structure_file_format(filename) == SMILES
[docs]def is_poseviewer_file(filename: str) -> bool: """ See structurereader.h for documentation """ # Lazy import for clean build to avoid swig built after module installation from schrodinger.infra import structure as infrastructure return infrastructure.is_glide_pose_viewer_file(filename)
[docs]def is_cms_file(filename: str) -> bool: """ Returns True if specified filename represent a CMS file. :param filename: a filename :return: Is this filename a CMS file? """ return splitext(filename)[1] in _CMS_EXTS
[docs]def is_hypothesis_file(filename: str) -> bool: """ Returns True if specified filename represents a Phase hypothesis file. The .phypo extension corresponds to a gzipped Maestro file containing a single ct which is a Phase hypothesis. :param filename: a filename :return: Is this filename a Phase hypothesis file? """ return get_structure_file_format(filename) == PHASE_HYPO
[docs]def strip_extension(filename: str) -> str: """ Return a new file path without extension. Suffixes such as "_pv" and "_epv" are also removed. """ basename = splitext(filename)[0] for ext in ('_pv', '_epv'): if basename.endswith(ext): basename = basename[:-len(ext)] return basename
[docs]def get_basename(filename: str) -> str: """ Returns the final component of specified path name minus the extension. Suffixes such as "_pv" and "_epv" are also stripped. """ return strip_extension(os.path.basename(filename))
[docs]def is_gzipped_structure_file(filename: str) -> bool: """ Returns True if the filename represents a file that is GZipped and it has a recognized structure extension. :param filename: a filename :return: Is this filename a gzipped structure file? """ return (get_structure_file_format(filename) and filename.endswith("gz"))
#=============================================================================== # Job names #===============================================================================
[docs]def is_valid_jobname(jobname: str) -> bool: """ Returns True if specified job name is valid, does not contain any illegal characters, and does not start with ".". """ if not jobname: return False # Jobs that start with "." are not allowed by Maestro (EV:125956) # Jobs that start with "-" are not allowed (PYAPP-4590) if jobname.startswith(".") or jobname.startswith("-"): return False for char in jobname: if char.isalnum(): # alpha-numeric character continue elif char in ['_', '-', '.']: # allowed special characters continue else: # invalid character return False return True
[docs]def get_jobname(filename: str) -> str: """ Returns a job name derived from the specified filename. Same as get_basename(), except that illegal characters are removed. """ # Remove trailing slash, if there is one. filename = filename.rstrip('\\/') basename = get_basename(filename) jobname = "" for char in basename: if char.isalnum() or char in ['_', '-', '.']: jobname += char else: pass # ignore illegal characters # Do not allow jobname to start with a "." or a "-": if jobname == "." or jobname == "-": jobname = "" return jobname
#=============================================================================== # Filename enumeration #===============================================================================
[docs]def get_next_filename_prefix(path: str, midfix: str, zfill_width: int = 0) -> str: r""" Return next filename prefix in series <root><midfix><number>. Given a path (absolute or relative) to a filename or filename prefix, return the next prefix in the sequence implied by path and midfix. For example, with a path of /full/path/to/foo.mae, path/to/foo.mae or foo.mae, or /full/path/to/foo, path/to/foo or foo, and a midfix of '-', this function will return "foo-3" if any file whose prefix foo-2 (and no higher-numbered foo-\*) is present. It will return foo-1 if no file whose prefix is foo-<number> is present. The net effect is that any file-name extension in the path argument will be ignored. This function differs from next_filename() in that here, all files sharing the prefix contained in the path are searched, regardless of extension, and the next filename prefix is returned. The search is case sensitive or not depending on the semantics of the file system. The leading directory of the path, if any, is included in the return value. Usage note: you might use this when the filename prefix could be exhibited by many files and you don't want to overwrite any of them. For example, you are starting up a job which will create many files with the same prefix. """ # Decompose the path: (query_root, query_ext) = splitext(path) # Search directory for files whose prefixes are of form # <query_root><midfix><N>, where N is an integer: query_glob = ''.join([query_root, midfix, '*']) # starting index for number in filename prefix: start_num = len(query_root) + len(midfix) max_number_found = 0 for fname in glob.iglob(query_glob): (prefix, ext) = splitext(fname) try: number = int(prefix[start_num:]) except ValueError: # something other than a number was found starting at start_num continue max_number_found = max(max_number_found, number) return ''.join( [query_root, midfix, str(max_number_found + 1).zfill(zfill_width)])
[docs]def get_next_filename(path: str, midfix: str, zfill_width: int = 0): r""" Return next filename in series <root><midfix><number>.<ext>. Given a path (absolute or relative) to a filename, return the next filename in the sequence implied by path and midfix. For example, with a path of /full/path/to/foo.mae, path/to/foo.mae or foo.mae and a midfix of '-', this function will return "foo-3.mae" if file foo-2.mae (and no higher-numbered foo-\*.mae) is present. It will return foo-1.mae if no file named foo-<number>.mae is present. This function differs from next_filename_prefix() in that here, only files with the specified extension are searched, and the next full filename is retured. The search is case sensitive or not depending on the semantics of the file system. The leading directory of the path, if any, is included in the return value. Usage note: You might use this when you are expecting to update only a single file: the one whose filename is given in the path. For example, you are exporting structures to a .mae file and you want to pick a non-conflicting name based on a user's filename specification. """ # Decompose the path: (query_root, query_ext) = splitext(path) # Search directory for files whose names are of form # <query_root><midfix><N><query_ext>, where N is an integer: query_glob = ''.join([query_root, midfix, '*', query_ext]) # starting index for number in filename prefix: start_num = len(query_root) + len(midfix) max_number_found = 0 for fname in glob.iglob(query_glob): (prefix, ext) = splitext(fname) try: number = int(prefix[start_num:]) except ValueError: # something other than a number was found; continue max_number_found = max(max_number_found, number) return ''.join([ query_root, midfix, str(max_number_found + 1).zfill(zfill_width), query_ext ])
[docs]def get_mmshare_dir() -> str: r""" Return the path to the local $SCHRODINGER/mmshare-\*/ directory :return: Path to the "mmshare" directory. """ mmshare_exec = os.environ['MMSHARE_EXEC'] return os.path.dirname(os.path.dirname(mmshare_exec))
[docs]def get_mmshare_data_dir() -> str: r""" Return the path of the local $SCHRODINGER/mmshare-\*/data/ directory. :return: Path to the "data" directory. """ return os.path.join(get_mmshare_dir(), 'data')
[docs]def get_mmshare_scripts_dir() -> str: r""" Return the path of the $SCHRODINGER/mmshare-\*/python/scripts/ directory. :return: Path to the "scripts" directory. """ return os.path.join(get_mmshare_dir(), 'python', 'scripts')
[docs]def get_mmshare_common_dir() -> str: r""" Return the path of the $SCHRODINGER/mmshare-\*/python/common/ directory. :return: Path to the "common" directory. """ return os.path.join(get_mmshare_dir(), 'python', 'common')
[docs]def get_docs_dir() -> str: """ Return the path to the local $SCHRODINGER/docs/ directory :return: Path to the "docs" directory. """ return os.path.join(os.path.dirname(get_mmshare_dir()), 'docs')
#=============================================================================== # Special directories #===============================================================================
[docs]def get_directory_path(which_directory) -> str: """ This function returns the schrodinger specific directory. If an invalid which_directory is specified, then a TypeError is thrown. Valid directories are: - HOME : To get user's home dir - APPDATA : To get the Schrodinger application shared data dir - LOCAL_APPDATA : To get the Schrodinger application local data dir - USERDATA : To get user's data dir - TEMP : To get default temporary data dir - DESKTOP : To get user's desktop dir - DOCUMENTS : To get user's 'My Documents' dir - NETWORK : To get user's 'My Network places' dir (only for Windows) :type which_directory: constant :rtype: str :return: Directory path """ if which_directory == mm.DirectoryName_MMFILE_TEMP: return mm.get_schrodinger_temp_dir() return mm.mmfile_get_directory_path(which_directory)
def _deprecated_get_dir_warning(): """ Issue a deprecation warning which redirects users to call get_directory_path instead of the deprecated functions below. """ msg = 'This function is deprecated. Use get_directory_path instead.' warnings.warn(msg, DeprecationWarning, stacklevel=3)
[docs]def get_directory(which_directory) -> (int, str): """ :deprecated: Because this function behaves in a non-standard way by returning an mmlib status, `get_directory_path` is preferred. """ _deprecated_get_dir_warning() requested_dir = get_directory_path(which_directory) # return tuple of (status, requested_dir) for consistency with # previous mmfile-based code. if requested_dir: return (0, requested_dir) else: return (1, requested_dir)
[docs]def get_home_dir() -> str: """ :deprecated: get_directory_path should be used instead. """ _deprecated_get_dir_warning() return get_directory_path(mm.DirectoryName_MMFILE_HOME)
[docs]def get_appdata_dir() -> str: """ :deprecated: get_directory_path should be used instead. """ _deprecated_get_dir_warning() return get_directory_path(mm.DirectoryName_MMFILE_APPDATA)
[docs]def get_local_appdata_dir() -> str: """ :deprecated: get_directory_path should be used instead. """ _deprecated_get_dir_warning() return get_directory_path(mm.DirectoryName_MMFILE_LOCAL_APPDATA)
[docs]def get_desktop_dir() -> str: """ :deprecated: get_directory_path should be used instead. """ _deprecated_get_dir_warning() return get_directory_path(mm.DirectoryName_MMFILE_DESKTOP)
[docs]def get_mydocuments_dir() -> str: """ :deprecated: get_directory_path should be used instead. """ _deprecated_get_dir_warning() return get_directory_path(mm.DirectoryName_MMFILE_DOCUMENTS)
[docs]def get_mynetworkplaces_dir() -> str: """ :deprecated: get_directory_path should be used instead. """ _deprecated_get_dir_warning() return get_directory_path(mm.DirectoryName_MMFILE_NETWORK)
[docs]def get_userdata_dir() -> str: """ :deprecated: get_directory_path should be used instead. """ _deprecated_get_dir_warning() return get_directory_path(mm.DirectoryName_MMFILE_USERDATA)
[docs]def get_schrodinger_temp_dir() -> str: """ :deprecated: get_directory_path should be used instead. """ _deprecated_get_dir_warning() return get_directory_path(mm.DirectoryName_MMFILE_TEMP)
#=============================================================================== # PyMOL #=============================================================================== def _check_for_pymol_by_platform(path: str) -> Optional[str]: """ Check for platform-specific executable file or script in the directory path provided. :param path: The path to modify :return: The path to the PyMOL executable file, or None if PyMOL was not found in the given path """ if os.path.isdir(path): windows = sys.platform == 'win32' if windows: names = ['pymol4maestro.bat', 'pymolwin.exe', 'pymol.bat'] elif sys.platform.startswith('linux'): names = ['pymol4maestro', 'pymol'] elif sys.platform == 'darwin': names = ['MacPyMOL'] else: names = [] for name in names: test_path = os.path.join(path, name) if os.path.isfile(test_path): return test_path return None def _locate_pymol_in_registry(root_key: "winreg.PyHKEY", keypath: str, valuename: Optional[str] = None) -> str: """ Find out pymol installation path under given keypath and keyname. Check for standard pymol programs and return pymol_launch_command path. :param root_key: root registry key :param keypath: registry keypath :param valuename: valuename of the key - is always None for this module :return: the path to the Pymol executable, or None if not found :note: Any exception encountered during registry lookup results in None being returned """ if winreg is None: return None try: key_handle = winreg.OpenKeyEx(root_key, keypath) (path, type) = winreg.QueryValueEx(key_handle, valuename) winreg.CloseKey(key_handle) path = os.path.normpath(path) pymol_launch_command = _check_for_pymol_by_platform(path) return pymol_launch_command except OSError: # An error of expected type return None except Exception as msg: # We don't expect any other type of exception, but if we encounter one, # let's record it so we can get reports and deal with it instead of # failing silently. print('Encountered an error attempting to find PyMOL in the registry:') print(str(msg)) return None
[docs]def locate_darwin_pymol() -> Optional[str]: """ Return path to Pymol on a MacOS system. Return None if no Pymol installations are found. """ for dir_path in ["/Applications", os.environ.get("SCHRODINGER")]: pymols = Path(dir_path).glob('*PyMOL*.app') candidates = [] for pymol in pymols: # PyMOL 2.x: highest priority (prepend) launch_command = str(pymol) + "/Contents/MacOS/PyMOL" if os.path.isfile(launch_command): candidates.insert(0, launch_command) continue # MacPyMOL: lowest priority (append) launch_command = str(pymol) + "/Contents/MacOS/MacPyMOL" if os.path.isfile(launch_command): candidates.append(launch_command) if candidates: return candidates[0]
[docs]def locate_pymol() -> Optional[str]: """ Find the executable or script we use to launch PyMOL. :return: The pymol launch command or None if PyMOL was not found """ # top priority: PYMOL4MAESTRO points to the executable path = os.environ.get('PYMOL4MAESTRO') if path and os.path.isfile(path): return path # First check paths from environment variables in a specific priority order env_vars = ['PYMOL4MAESTRO', 'SCHRODINGER', 'PYMOL_PATH'] env_paths = [] for var in env_vars: path = os.environ.get(var) if path: env_paths.append(path) if var == 'SCHRODINGER': env_paths.append(os.path.join(path, 'pymol')) for path in env_paths: launch_command = _check_for_pymol_by_platform(path) if launch_command: return launch_command # for PyMOL prior to PyMOL v1.2r3s if sys.platform == 'win32': if winreg is not None: # Wow6432Node wow_node = 'SOFTWARE\\WOW6432Node\\Schrodinger\\PyMol\\PYMOL_PATH' # Standard registry location. std_node = 'SOFTWARE\\Schrodinger\\PyMol\\PYMOL_PATH' # Current user user = winreg.HKEY_CURRENT_USER # All users local = winreg.HKEY_LOCAL_MACHINE for node in [wow_node, std_node]: for utype in [user, local]: launch_command = _locate_pymol_in_registry(utype, node) if launch_command: return launch_command # Keep below paths for fallback mechanism. paths = [ 'c:\\program files\\pymol\\pymol', 'c:\\program files (x86)\\pymol\\pymol' ] for path in paths: launch_command = _check_for_pymol_by_platform(path) if launch_command: return launch_command # Mac if sys.platform == 'darwin': return locate_darwin_pymol() # 5th priority: look for pymol parallel to $SCHRODINGER path = os.environ.get('SCHRODINGER', None) if path: path = os.path.join(path, '..', 'pymol') launch_command = _check_for_pymol_by_platform(path) if launch_command: return launch_command # 6th priority: PATH pymol_launch_command = shutil.which("pymol") if pymol_launch_command: return pymol_launch_command # Return "None" at this point. We used to return just a simple "pymol" # but that's unlikely to work and we do need to know at this point # that the standard search has failed. return None
[docs]def get_pymol_cmd(use_x11: bool = False) -> List[str]: """ Get a cmd list for launching Pymol. This may include extra platform- specific arguments. :param use_x11: if True causes -m to be added to the launch command on Mac :return: a cmd list with the executable as first element and any other options following it. """ pymol_exe = locate_pymol() cmd = [pymol_exe] if sys.platform == 'win32': cmd.append('+4') if sys.platform == 'darwin' and use_x11: cmd.append('-m') return cmd
#=============================================================================== # Misc #===============================================================================
[docs]class chdir: """ A context manager that carries out commands inside of the specified directory and restores the current directory when done. """
[docs] def __init__(self, dirname: Union[Path, str]): self.dirname = dirname self.orig_dir = os.getcwd()
def __enter__(self): os.chdir(self.dirname) def __exit__(self, *args): os.chdir(self.orig_dir)
[docs]def mkdir_p(path: str, *mode): """ :deprecated: use `os.makedirs(path, exist_ok=True)` """ try: os.makedirs(path, *mode) except OSError: if not os.path.isdir(path): raise
[docs]class tempfilename(str): def __new__(cls, prefix="tmp", suffix="", temp_dir=None): """ A thread-safe replacement for tempfile.mktemp. Creates a temporary file with the given prefix, suffix, and directory and returns its filename. Calling remove will remove the file. Can be used as a context manager, which will remove the file automatically on exit. :param prefix: Filename prefix :type prefix: str :param suffix: Filename suffix :type suffix: str :param temp_dir: Filename directory path. Defaults to the schrodinger temp directory. :type temp_dir: str or None. :return: The name of the file descriptor, wrapped in a context manager that will remove the file on exit. """ if temp_dir is None: temp_dir = get_directory_path(TEMP) fd = tempfile.NamedTemporaryFile(prefix=prefix, suffix=suffix, delete=False, dir=temp_dir) # must close file on windows or StructureWriter will raise on access fd.close() return super().__new__(cls, fd.name) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.remove()
[docs] def remove(self): force_remove(self)
[docs]class TempStructureFile(tempfilename): def __new__(cls, sts): """ Creates a temporary file containing a set of structures using the thread-safe implementation of tempfilename. :param sts: structures to write to the temp file :type sts: iterable of {structure.Structure} """ from schrodinger.structure import StructureWriter temp_maegz = super().__new__(cls, suffix=".maegz") with StructureWriter(temp_maegz, overwrite=False) as fh: fh.extend(sts) return temp_maegz
[docs]def cat(source_filenames: List[str], dest_filename: str): """ Concatenate the contents of the source files, writing them to a destination file. All files are specified by name. If source_filenames is an empty list, an empty file is produced. :param source_filenames: input files :param dest_filename: destination file """ with open(dest_filename, 'wb') as fho: for fname in source_filenames: with open(fname, 'rb') as fh: shutil.copyfileobj(fh, fho)
[docs]def tar_files(tarname: str, mode: str, files: List[str]): """ Writes files to tar archive. :param tarname: Tar file name. :param mode: File open mode. :param files: Iterable over file names to be added to the archive. """ with tarfile.open(tarname, mode) as th: for fn in files: if os.path.exists(fn): th.add(fn)
[docs]def zip_files(zipname: str, mode: str, files: List[str]): """ Writes files to tar archive. :param zipname: Zip file name. :param mode: File open mode. :param files: Iterable over file names to be added to the archive. """ with zipfile.ZipFile(zipname, mode) as zp: for fn in files: if os.path.exists(fn): zp.write(fn)
[docs]def is_within_directory(directory, afile): dir_path = os.path.abspath(directory) prefix = os.path.commonprefix([dir_path, os.path.abspath(afile)]) return prefix == dir_path
[docs]def safe_extractall_tar(tar, path=".", *args, **kwargs): """ Extract all files from a tar file. Please see Python Vulnerability: CVE-2007-4559 for details on issue with tar.extractall() method. See tar.extractall method description for details on args and kwargs. :param `tarfile.TarFile` tar: TarFile object :param str path: path of directory where tarfile will be extracted """ for member in tar.getmembers(): member_path = os.path.join(path, member.name) if not is_within_directory(path, member_path): raise tarfile.TarError( f"{member_path} is not in tarfile object. Attempted " f"path traversal in tarfile") tar.extractall(path=path, *args, **kwargs)
[docs]def safe_extractall_zip(zip_file, path=".", *args, **kwargs): """ Extract all files from a zip file. Please see Python Vulnerability: CVE-2007-4559 for details on issue with zip.extractall() method. See zip.extractall method description for details on args and kwargs. :param `zipfile.ZipFile` zip_file: ZipFile object :param str path: path of directory where tarfile will be extracted """ for member in zip_file.infolist(): member_path = os.path.join(path, member.filename) if not is_within_directory(path, member_path): raise zipfile.BadZipfile( f"{member_path} is not in zipfile object. Attempted " f"path traversal in tarfile") zip_file.extractall(path=path, *args, **kwargs)
[docs]def on_same_drive_letter(path_a: str, path_b: str) -> bool: """ Returns true if path_a and path_b are on the same driveletter. On systems without drive letters, always return True. """ return os.path.splitdrive(path_a)[0] == os.path.splitdrive(path_b)[0]
[docs]def get_files_from_folder(folder_abs_path: str) -> List[Tuple[str, str]]: """ Walk through a folder, find all files inside it. :param folder_abs_path: folder path :return: each tuple contains: absolute path of a file, and a relative path that the file will be transferred to. """ folder_abs_path = os.path.abspath(folder_abs_path) file_and_path = [] dir_name = os.path.dirname(folder_abs_path) for root, dirs, files in os.walk(folder_abs_path): for file in files: abs_pathname = os.path.join(root, file) runtime_path = os.path.relpath(abs_pathname, start=dir_name) file_and_path.append(tuple([abs_pathname, runtime_path])) return file_and_path
[docs]@contextmanager def change_working_directory(folder: Union[Path, str]): """ A context manager to temporarily change the working directory to folder :param folder: the folder that becomes the working directory """ old_folder = os.getcwd() os.chdir(folder) try: yield finally: os.chdir(old_folder)
[docs]@contextmanager def in_temporary_directory(): """ A context manager for executing a block of code in a temporary directory. """ with tempfile.TemporaryDirectory() as tmp, change_working_directory(tmp): yield
[docs]@contextmanager def mmfile_path(path: Optional[str] = None): """ Context manager and decorator that resets the mmfile search path on exit. If the optional `path` is supplied, it is set on entry. :param path: mmfile path to set while in the context """ mm.mmfile_initialize(mm.error_handler) try: old_path = mm.mmfile_path_get() if path is not None: mm.mmfile_path_set(path) try: yield finally: mm.mmfile_path_set(old_path) finally: mm.mmfile_terminate()
[docs]def count_lines(filename: str) -> int: """ Count the number of newlines in a file, in a way similar to "wc -l". :param filename: input filename :return: number of newlines in file """ nlines = 0 with open_maybe_compressed(filename, 'rb') as fh: while True: blob = fh.read(2**16) # Empirically optimized block size. if not blob: break nlines += blob.count(b'\n') return nlines
[docs]def get_directory_size(dirpath): """ Get the size of the given directory in MB (Note: MB => 1e6 bytes) :param str dirpath: The path to the directory :rtype: float :return: The size of the directory in MB """ directory = Path(dirpath) bytesize = sum( x.stat().st_size for x in directory.glob('**/*') if x.is_file()) return bytesize / 1e6
[docs]def get_existing_filepath(path_file: str) -> Optional[str]: """ Check and find the path/file either at the given path, in the current working directory, and the original launch directory. The first found path is returned. This can be useful when the file has been copied from path_file to the CWD, such as when launchapi copies a file from an absolute path on the local machine into the job directory on a remote machine. This can also be useful when large files (e.g. trajectory) file are not copied from path_file to the job launch dir for localhost jobs. The job in the current launch dir can access the files in the original launch dir. :type path_file: filename with path :return: None if the file cannot be located """ valid_path = None if os.path.exists(path_file): # path_file exists (e.g., without jobcotrol; under jobcontol using local # host) valid_path = path_file elif os.path.exists(os.path.basename(path_file)): # Finds files from unix-like paths in the CWD (e.g, Under jobcontrol, # job submission from Mac local to remote linux host. # Change abs path c/dir1 to relative dir1) valid_path = os.path.basename(path_file) elif os.path.exists(ntpath.basename(path_file)): # Finds files from Windows-like paths in the CWD (e.g, Under jobcontrol, # job submission from local windows to remote linux host. # Change abs path 'C:\\dir' to relative dir1) valid_path = ntpath.basename(path_file) elif not os.path.isabs(path_file): # local import to avoid cyclic import between fileutils and jobcontrol from schrodinger.job.jobcontrol import get_backend backend = get_backend() if backend: job = backend.getJob() if job and job.Dir: filepath_orig_dir = os.path.join(job.Dir, path_file) if os.path.exists(filepath_orig_dir): valid_path = filepath_orig_dir return valid_path
[docs]def xyz_to_sdf(xyz_filepath: str, out_sdf: Optional[str] = None, save_file: bool = True) -> str: """ Convert a XYZ format file to sdf one. :param xyz_filepath: filename with path :param out_sdf: the output sdf filename if provided. If None means the out_sdf is auto-set based on input filename :param save_file: If false, the output information is written to stdout instead of a file. :return: the output sdf filename :raise ValueError: input file is of wrong extension :raise RuntimeError: failed to convert the xyz file """ basename, ext = splitext(xyz_filepath) if ext != XYZ_EXT: raise ValueError(f"{xyz_filepath} is not of xyz file extension.") if save_file: if out_sdf is None: out_sdf = basename + SDF_EXT out_sdf = os.path.basename(out_sdf) cmd = ['obabel', xyz_filepath, '-osdf', '-O', out_sdf] else: out_sdf = None cmd = ['obabel', xyz_filepath, '-osdf'] try: process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except (TypeError, ValueError, OSError) as err: raise RuntimeError(str(err)) stdout, stderr = process.communicate() if process.returncode or "\n0 molecule converted\n" in stderr.decode() or \ (save_file and not os.path.exists(out_sdf)): raise RuntimeError(stderr.decode()) return out_sdf
[docs]def open_maybe_compressed(filename: str, *a, **d) -> io.IOBase: """ Open a file, using the gzip module if the filename ends in gz, or the builtin open otherwise. All arguments are passed through. """ open_func = gzip.open if filename.lower().endswith('gz') else open return open_func(filename, *a, **d)
[docs]def get_csv_file_column_count(csv_file: str) -> int: """ Return the number of columns in the csv file. :param csv_file: CSV file path. :return: Number of columns in the csv file. """ with csv_unicode.reader_open(csv_file) as csvfile: reader = csv.reader(csvfile) try: header = next(reader) except StopIteration: return 0 return len(header)
[docs]def hash_for_file(path, algorithm=hashlib.md5, buff_size=8388608): """ Get file hash. :param str path: File path :param method algorithm: Algorithm to use :param int buff_size: Buffer size :rtype: str :return: File hash """ with open(path, 'rb') as hash_file: digest = algorithm() while True: block = hash_file.read() if not block: break digest.update(block) return digest.hexdigest()
[docs]def extended_windows_path(dos_path, only_if_required=True): """ Convert path to absolute path and prepend extended path tag to paths on Windows :type dos_path: str :param dos_path: a Windows file path, which may be longer than 256 characters and therefore invalid :type only_if_required: bool :param only_if_required: Whether to append windows extended path tag to to file paths that do not exceed WINDOWS_MAX_PATH in length. :rtype: string :return: An Windows extended file path which can accommodate 30000+ characters """ abspath = os.path.abspath(dos_path) if sys.platform == 'win32' and (not only_if_required or len(abspath) > WINDOWS_MAX_PATH): if not abspath.startswith(WINDOWS_EXTENDED_PATH_TAG): if abspath.startswith("\\\\"): abspath = WINDOWS_EXTENDED_PATH_TAG + "UNC\\" + abspath[2:] else: abspath = WINDOWS_EXTENDED_PATH_TAG + abspath return abspath
[docs]def slugify(text): """ Slugifies a filename for use in a URL or file name. Based on the Django implementation. (https://github.com/django/django/blob/dcebc5da4831d2982b26d00a9480ad538b5c5acf/django/utils/text.py#L400) :param text: Text to slugify :type text: str :return: Slugified text :rtype: str """ text = str(text) text = (unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii")) text = re.sub(r"[^\w\s-]", "", text.lower()) return re.sub(r"[-\s]+", "-", text).strip("-_")