Source code for schrodinger.test.stu.common

"""
Functions that are used in one or more modules. Reduces danger of circular
dependencies.

@copyright: Schrodinger, Inc. All rights reserved.
"""

import datetime
import errno
import itertools
import os
import re
import shutil
import stat
import sys
import zipfile
from dataclasses import dataclass
from typing import BinaryIO
from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import Union

from schrodinger.utils import fileutils
from schrodinger.utils import log

logger = log.get_output_logger('stu_backend')

_api_key = None
"""
Module level cache for API key. Only accessible by one user, so this is OK.
"""

BASE_URL = 'https://stu.schrodinger.com'
# BASE_URL = 'https://stutest02.dev.bb.schrodinger.com'
"""Address of STU server"""

DATE_RE = r'^\d\d\d\d-\d\d-\d\d$'
"""The date format that our NBs use."""

BUILD_ID_RE = re.compile(r'build(?:\d{2}\b|-\d{3}\b)')
"""Updated build_id naming scheme."""
JOBID_RE = r'-[a-f0-9]{8}$|-[a-f0-9]{8}\.'

ZIP_MODE = zipfile.ZIP_DEFLATED


[docs]@dataclass class FileInfo: path: str size: int
def _gigabytes(gb: float) -> float: """Return bytes, input GiB""" return pow(1024, 3) * gb
[docs]class ZipError(Exception): """Error while zipping up files."""
[docs]def check_disk_usage(path): msg = "" usage = shutil.disk_usage(path) mega = 1e6 if (usage.free < 100 * mega or usage.used / usage.total > 0.99): msg = (f"Disk is {usage.used/usage.total:.1%} full " f"({usage.free/mega:.1f} MB available)") return msg
[docs]def str2list(string): """ Takes a string which can be a comma- or space-separated collection of positive integers and ranges and returns an ordered list of numbers. :param input_string: A comma-separated string of positive integers and ranges :type input_string: str :return: An ordered list of integers :rtype: list(int) """ output_set = set() # treat any number of commas and spaces between digits as a delimiter for part in re.split(r'(?<=\d)[,\s]+(?=\d)', string): x = re.split(' *- *', part) if len(x) > 2: raise TypeError('Problem converting "%s" to integer list' % string) try: output_set.update(list(range(int(x[0]), int(x[-1]) + 1))) except ValueError: raise TypeError('Problem converting "%s" to integer list' % string) return sorted(output_set)
[docs]def str2strlist(string): """ Split a string into a list of strings. Used in parser. """ if not string: return [] return string.split(',')
[docs]def get_api_key(): """ Get the user's API key. Uses caching. Also ensures that the user's API key is only readable by self - raises RuntimeError if anyone else has read permission. :rtype: str :return: User's API key from disk. """ global _api_key if _api_key: return _api_key if os.getenv('STU_APIKEY_PATH'): path = os.getenv('STU_APIKEY_PATH') else: paths_to_search = [ os.path.expanduser("~"), fileutils.get_directory_path(fileutils.APPDATA) ] if sys.platform == "win32": msys_home = r"c:\msys64\home\{}".format(os.environ.get("USERNAME")) paths_to_search.insert(0, msys_home) paths_to_search.insert(0, msys_home + ".schrodinger") for dir_path, ext, prefix in itertools.product(paths_to_search, ( ".txt", "", ), ( ".", "", )): path = os.path.join(dir_path, prefix + 'stu_apikey' + ext) if os.path.isfile(path): break if not os.path.isfile(path): msg = ('API Key not available at {path}{apikey_path_msg}. ' 'See the STU docs for more information: ' '"{stu_url}/doc/quickstart#stu_apikey".') if 'STU_APIKEY_PATH' not in os.environ: apikey_path_msg = ' (and STU_APIKEY_PATH not set)' else: apikey_path_msg = '' raise OSError( msg.format(path=path, stu_url=BASE_URL, apikey_path_msg=apikey_path_msg)) if hasattr(os, 'uname'): permissions = os.stat(path).st_mode if (permissions & stat.S_IRGRP or permissions & stat.S_IROTH or permissions & stat.S_IWGRP or permissions & stat.S_IWOTH): msg = ('API Key ({path}) is accessible by others. Please remove ' 'permission for other users to view this file. ' '(e.g. `chmod 600 {path}`). See the STU docs for more ' 'information: "{stu_url}/doc/quickstart#stu_apikey"') raise RuntimeError(msg.format(path=path, stu_url=BASE_URL)) with open(path) as fh: _api_key = fh.read().strip() logger.debug(f'Got STU API key from {path}') return _api_key
[docs]def assert_no_x(): xstub = "/usr/X11/bin/xstub" if not sys.platform.startswith("darwin"): return if os.path.exists(xstub) and not os.path.islink(xstub): raise AssertionError( "{} is incompatible with running STU on OS X. Remove file or symlink to /usr/bin/true" .format(xstub))
[docs]def assert_build_id_matches(buildtype, build_id): """Check that the build_id is appropriate for the buildtype""" if buildtype and not build_id: msg = f'Build ID is required when buildtype is {buildtype}' raise AssertionError(msg) if buildtype == 'OB' and not BUILD_ID_RE.match(build_id): msg = ( 'For Official Builds (OB), build_id must be of the format ' 'buildXX or build-XXX, where XX/XXX are the last two/three digits of the mmshare ' 'version.') raise AssertionError(msg) elif buildtype == 'NB' and not (re.match(DATE_RE, build_id) or BUILD_ID_RE.match(build_id)): msg = ('For Nightly Builds (NB), build_id must be of the format ' 'YYYY-MM-DD or build-XXX.') raise AssertionError(msg)
[docs]def verify_zip(fileobj): """ Attempt to open fileobj as a zipfile.ZipFile and check it for errors using ZipFile.testzip() (file headers and CRC32 check for all files). :raise ZipError: If ZipFile.testzip() retuns a non-None value, indicating a corrupted file. """ with zipfile.ZipFile(fileobj, 'r') as zf: corrupted_file = zf.testzip() if corrupted_file: raise ZipError('Generated zip file is invalid: ' f'{corrupted_file} is corrupted')
[docs]def zip_files(fileobj, relative_to, filenames): """ Zip a list of files into an archive. Relative paths are relative to `relative_to`. """ zf = zipfile.ZipFile(fileobj, 'w', ZIP_MODE) start_dir = os.getcwd() bad_symlinks = [] try: os.chdir(relative_to) for f in filenames: _add_file_to_zip(zf, f, f, bad_symlinks) finally: os.chdir(start_dir) if bad_symlinks: missing = 'Symlinks with missing destinations: %s' % ', '.join( bad_symlinks) raise ZipError(missing)
def _add_file_to_zip(archive, absolute_path, relative_path, bad_symlinks): if os.path.islink(absolute_path): if not os.path.exists(absolute_path): bad_symlinks.append('{} -> {}'.format(relative_path, os.readlink(absolute_path))) zfi = zipfile.ZipInfo(relative_path) # symlink magic numbers, from: # http://www.mail-archive.com/python-list@python.org/msg34223.html zfi.create_system = 3 zfi.external_attr = 2716663808 archive.writestr(zfi, os.readlink(absolute_path)) else: try: _add_regular_file_to_zip(archive, absolute_path, relative_path) except ValueError as e: logger.exception( f"Could not write {absolute_path} to {relative_path}") raise def _add_regular_file_to_zip(archive, absolute_path, relative_path): with open(absolute_path, 'rb') as fh: zfi = zipfile.ZipInfo.from_file(absolute_path, arcname=relative_path) with archive.open(zfi, 'w') as zh: while True: chunk = fh.read(1024 * 1024) if not chunk: break zh.write(chunk) def _sort_files_by_size(files: Iterable[str]) -> List[FileInfo]: """ Return list of files sorted by size. :param files: List of absolute paths to files to be sorted by size. :return: List of FileInfo objects, sorted by largest files last. """ file_infos = [] for filepath in files: try: filesize = os.stat(filepath).st_size except FileNotFoundError: # File can disappear between checking existence and size continue file_infos.append(FileInfo(path=filepath, size=filesize)) return sorted(file_infos, key=lambda x: x.size)
[docs]def prune_largest_files(files: Iterable[str]) -> List[str]: """ Remove largest files first, one by one off the back, until total file list size is beneath the size_threshold. :param files: List of absolute paths to files to be pruned until total file list size is within size_threshold :return: List of absolute paths of files to be zipped. """ file_infos = _sort_files_by_size(files) size_threshold = _gigabytes(2) while len(file_infos) > 0: total_size = sum(s.size for s in file_infos) if total_size < size_threshold: break file_infos.pop() return [f.path for f in file_infos]
[docs]def zip_directory(zip_root: str, fileobj: Optional[Union[str, BinaryIO]] = None, skipped_files: Optional[Set[str]] = None): """ Zip the contents of a directory. File names will be relative to the directory name. Preserves symlinks. :param zip_root: Directory to be zipped :param fileobj: Filename or file-like object for the output zipfile. If not specified, the basename of zip_root + .zip is used. :param skipped_files: Names of files that should be excluded from the zip archive. """ bad_symlinks = [] all_files = [] missing_files = [] other_errors = [] # Unconditionally use the extended path tag on windows, even if zip_root # itself doesn't need to. This avoids any conditionals in the os.walk logic # because "root" will be a valid long path. zip_root = fileutils.extended_windows_path(zip_root, only_if_required=False) if fileobj is None: fileobj = os.path.basename(zip_root) + '.zip' with zipfile.ZipFile(fileobj, 'w', ZIP_MODE) as zf: for root, dirs, files in os.walk(zip_root): for filename in files: absname = os.path.join(root, filename) relname = os.path.relpath(absname, zip_root) if skipped_files and relname in skipped_files: continue all_files.append(absname) # Allow saving empty directories. for dirname in dirs: absname = os.path.join(root, dirname) relname = os.path.relpath(absname, zip_root) zfi = zipfile.ZipInfo(relname + '/') # give read and write access to the directory zfi.external_attr = 0o750 << 16 last_modified = os.path.getmtime(absname) last_modified = datetime.datetime.fromtimestamp(last_modified) zfi.date_time = last_modified.timetuple() zf.writestr(zfi, '') files_to_zip = prune_largest_files(all_files) for absname in files_to_zip: relname = os.path.relpath(absname, zip_root) try: _add_file_to_zip(zf, absname, relname, bad_symlinks) except OSError as err: if err.errno == errno.ENOENT: # It's OK if temporary jobcontrol files are removed # during processing if not re.search(JOBID_RE, absname): missing_files.append(relname) elif err.errno in {errno.ENXIO, errno.EOPNOTSUPP}: # we just avoid adding unix sockets continue else: other_errors.append(f'{relname}:{err}') if bad_symlinks: missing = 'Symlinks with missing destinations: %s' % ', '.join( bad_symlinks) raise ZipError(missing) if missing_files: missing = 'Files that disappeared while being zipped up : %s' % ', '.join( missing_files) raise ZipError(missing) if other_errors: missing = 'Files with other problems during zip: \n %s' % '\n '.join( other_errors) raise ZipError(missing)