Source code for schrodinger.application.mopac.utils

# Copyright Schrodinger, LLC. All rights reserved.
# Contributors: Mike Beachy, Mark A. Watson

import errno
import glob
import os
import re
import shutil
import tempfile
import zipfile

REMOVE, ZIP, SAVE = "remove", "zip", "save"


[docs]def open_file(basename, suffix): """ Open a file with a unique name based on basename and suffix in the form "basename-<index>.suffix", where "<index>" is an integer that is incremented until the file can be created. Return the opened file object. """ ix = 0 filename = "%s.%s" % (basename, suffix) while True: if not os.path.exists(filename): try: fdescriptor = os.open(filename, os.O_EXCL | os.O_CREAT | os.O_WRONLY) file_ = os.fdopen(fdescriptor, "w") break except OSError as e: if e.errno != errno.EEXIST: raise ix += 1 filename = "%s-%d.%s" % (basename, ix, suffix) return filename, file_
[docs]def make_scratch_dir(tmpdir, basename): """ Make a separate directory for the job to enable easy cleanup. :type basename: str :param basename: A name that will be used to generate a scratch dir name. The name used will either be basename or basename.1, basename.2, etc. :type tmpdir: str :param tmpdir: A directory that will hold the scratch directory. :returns: The name of the directory that was created. """ scr_dir_base = os.path.join(tmpdir, basename) ix = 0 scr_dir = scr_dir_base while True: if not os.path.exists(scr_dir): try: os.mkdir(scr_dir) break except OSError as e: if e.errno != errno.EEXIST: raise ix += 1 scr_dir = "%s.%d" % (scr_dir_base, ix) return os.path.abspath(scr_dir)
[docs]def run_cleanup(results, start_dir, scr_dir, jobname, save_output_file, scratch_cleanup): """ Cleanup from a subdirectory run. This will completely nuke the scratch dir and cd to the start_dir. :type results: MopacResults :param results: A MopacResults object, in which output filename and zipped_output filenames are stored if available. :type start_dir: str :param start_dir: The launch directory for the job. :type scr_dir: str :param scr_dir: The scratch directory for the job as an absolute path. :type jobname: str :param jobname: The base job name. :type save_output_file: bool :param save_output_file: If True, copy the output file from the scratch dir back to the starting directory. :type scratch_cleanup: enum :param scratch_cleanup: If REMOVE, simply remove the scratch dir at the end of the job; if ZIP, create a zip file of the scratch directory contents; if SAVE, do no cleanup. """ os.chdir(scr_dir) if save_output_file and os.path.isfile(jobname + ".out"): shutil.copy(jobname + ".out", start_dir) if results: results.set_output_file(jobname + ".out") # Always copy .vis files up to the start_dir. They are registered # as outputfiles by the semi_emp.py driver L = glob.glob('*.vis') for visfile in L: shutil.copy(visfile, start_dir) if scratch_cleanup == ZIP: filename, file_ = open_file(os.path.join(start_dir, jobname), "zip") print("Creating zip file '%s'." % filename) try: zfile = zipfile.ZipFile(file_, "w", zipfile.ZIP_DEFLATED) except RuntimeError: # The zipfile docs say that ZIP_DEFLATED isn't always available. # If it isn't, just bundle up the files without compression. zfile = zipfile.ZipFile(file_, "w") for f in os.listdir(os.path.curdir): zfile.write(f, os.path.join(jobname, f)) zfile.close() os.chdir(start_dir) if scratch_cleanup != SAVE: shutil.rmtree(scr_dir)
[docs]def is_mopac_file(filename): """ Determine if the file provided is a MOPAC input file or not. Returns True or False. """ if filename.endswith(".mop") or filename.endswith(".dat"): return True else: return False
[docs]def cleanup_external(inputfile, start_dir): """ If the input file has a relative path specification for an EXTERNAL file, rewrite it to the local dir, then copy the original file to the current directory. """ filename = None newfd, newname = tempfile.mkstemp(dir=os.path.curdir) newfile = os.fdopen(newfd, 'w') # Pattern to search for EXTERNAL keyword specification. _external_re = re.compile(r"EXTERNAL *= *(\S+)", re.I) for ix, line in enumerate(open(inputfile)): match = _external_re.search(line) if match: external_file = match.group(1) if not os.path.isabs(external_file): filename = os.path.basename(external_file) line = _external_re.sub("EXTERNAL=%s" % (filename,), line) shutil.copy(os.path.join(start_dir, external_file), os.path.curdir) newfile.write(line) newfile.close() if filename: os.remove(inputfile) os.rename(newname, inputfile)
[docs]def convert_sparse_dict_to_list(sdict): """ :type sdict: dict :param sdict: dictionary keys must be integers. :return list with non-key elements as None. e.g. { 2:'a', 4:'b', 5:'c'} returns [None, 'a', None, 'b', 'c'] """ # unittested if not sdict: return [] lst = [None for i in range(max(list(sdict)))] for idx, val in sdict.items(): try: lst[idx - 1] = val except TypeError: raise TypeError('sdict expects integer keys!') except: raise RuntimeError('problem in convert_sparse_dict_to_list') return lst