Source code for schrodinger.application.matsci.textlogger

"""
Class to set up a nicely formatted logger

Copyright Schrodinger, LLC. All rights reserved."""

# Contributor: Thomas F. Hughes

import base64
import contextlib
from functools import wraps
import logging
import sys
import textwrap
import time

import schrodinger
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.job.remote_command import debug
from schrodinger.utils import cmdline
from schrodinger.utils import fileutils

DESMOND_CUI_FLAG = ['trj', 'cms']
DEFAULT_LOG_LINE_WIDTH = 80
PLAIN_TEXT_COMMAND = 'PlainTextCmd'

DRIVER_LOG_EXT = '-driver.log'


[docs]def log_error(msg, logger=None): """ Add a message to the log file and exit with an error code :type msg: str :param msg: The message to log :type logger: `logging.Logger` :param logger: The logger object or None if msg should be printed """ log_msg(msg, logger=logger) log_msg('Finished', timestamp=True, logger=logger) sys.exit(1)
[docs]def log_msg(msg, timestamp=False, pad=False, pad_below=False, logger=None): """ Add a message to the log file :type msg: str :param msg: The message to log :type timestamp: bool :param timestamp: Whether to print a timestamp with the message :type pad: bool :param pad: Whether to pad above this message with a blank line :type pad_below: bool :param pad_below: Whether to pad below this message with a blank line :type logger: `logging.Logger` :param logger: The logger object or None if msg should be printed """ if timestamp: msg = msg + ' at ' + time.ctime() if pad: log(logger, "") log(logger, msg) if pad_below: log(logger, "")
[docs]def log(logger, msg, level=logging.INFO): """ Log a message if the logger is defined, else print it :type logger: `logging.Logger` :param logger: The logger object or None if no logger is defined :type msg: str :param msg: The message to log :type level: int :param level: The log level to use when logging. These are constants from the Python logging module. The default is INFO. """ if logger: logger.log(level, msg) else: print(msg)
[docs]def log_debug(logger, msg): """ Log a massage at the DEBUG level. :type logger: `logging.Logger` :param logger: The logger object or None if no logger is defined :param str msg: The message to log """ msg = 'DEBUG: %s at %s' % (msg, time.ctime()) log(logger, msg, level=logging.DEBUG)
[docs]def timeit_logger_debug(func): """ Log enter and leave time of a function in a class that may or may not contain self.logger attribute. If it doesn't, logging will be done to the stdout. """ @wraps(func) def wrapper(self, *args, **kwargs): try: logger = self.logger except AttributeError: logger = None log_debug(logger, 'Enter %s' % func.__name__) ret = func(self, *args, **kwargs) log_debug(logger, 'Leave %s' % func.__name__) return ret return wrapper
NAMESPACE_HEADER = 'Command line options:' JOB_HEADER = 'Job information'
[docs]def log_initial_data(logger, namespace, msgwidth=None, namespace_header=NAMESPACE_HEADER, job_header=JOB_HEADER): """ Log information about the currently running job, if running under job control, and all information in the namespace object. :type namespace: object :param namespace: The object whose properties should be logged. In typical use, this is an `argparse.Namespace` object. All the values in the __dict__ of this object will be logged. Note that __dict__ does not contain the double-underscore class methods. :type logger: `logging.Logger` :param logger: The logger to log with :type msgwidth: int :param msgwidth: The width of the log messages. Taken from the logger if not supplied :type namespace_header: str or None :param namespace_header: If not None, this line will be printed above the logged namespace properties :type job_header: str or None :param job_header: If not None, this line will be printed above the logged job properties """ if not msgwidth: msgwidth = logger.handlers[0].formatter.msgwidth log_jobinfo(logger, msgwidth=msgwidth, header=job_header) log(logger, "") log_namespace(namespace, logger, msgwidth=msgwidth, header=namespace_header)
[docs]def log_namespace(namespace, logger, msgwidth=None, header=NAMESPACE_HEADER): """ Log the value of all properties on a object, such as logging all the parameters of an `argparse.Namespace` object. :type namespace: object :param namespace: The object whose properties should be logged. In typical use, this is an `argparse.Namespace` object. All the values in the __dict__ of this object will be logged. Note that __dict__ does not contain the double-underscore class methods. :type logger: `logging.Logger` :param logger: The logger to log with :type msgwidth: int :param msgwidth: The width of the log messages. Taken from the logger if not supplied :type header: str or None :param header: If not None, this line will be printed above the logged properties """ if not msgwidth: msgwidth = logger.handlers[0].formatter.msgwidth if header: log(logger, header) try: keys = list(namespace.__dict__) except AttributeError: # This object has no __dict__ method, so nothing to log return # Log the properties in alphabetical order keys.sort() jc_options = {x.name for x in cmdline.Options} for attribute in keys: # Do not log job control properties (MATSCI-9553) if str(attribute).upper() in jc_options: continue value = namespace.__dict__[attribute] if isinstance(value, (list, tuple, set)): if attribute in DESMOND_CUI_FLAG: value = value[-1] else: value = ','.join([str(x) for x in value]) elif isinstance(value, dict): value = ','.join( ['%s=%s' % (str(a), str(b)) for a, b in value.items()]) log(logger, get_param_string(attribute, value, msgwidth))
[docs]def log_jobinfo(logger, msgwidth=None, header=JOB_HEADER): """ Log the information about the current running job if there is one. :type logger: `logging.Logger` :param logger: The logger to log with :type msgwidth: int :param msgwidth: The width of the log messages. Taken from the logger if not supplied :type header: str or None :param header: If not None, this line will be printed above the logged properties """ backend = jobcontrol.get_backend() if not backend: return if not msgwidth: msgwidth = logger.handlers[0].formatter.msgwidth if header: log(logger, header) job = backend.getJob() jobinfo = job.getApplicationHeaderFields() for key, value in jobinfo.items(): log(logger, get_param_string(key, value, msgwidth)) if key == 'Commandline' and 'BASE64' in value: ptext, raw_text = decode_base64_cmd(value) log(logger, get_param_string(PLAIN_TEXT_COMMAND, ptext, msgwidth)) log(logger, get_param_string('JobHostStartTime', time.ctime(), msgwidth))
[docs]def run_jobs_with_update_logging(jobq, logfn, fraction=0.10, interval=3500, add_subjob_files=True): """ Run all the current jobs in the JobDJ and logs progress information at semi-regular intervals so the user can track status :type jobq: `schrodinger.job.queue.JobDJ` :param jobq: The queue to run :type logfn: callable :param logfn: A function that takes a string and a boolean timestamp argument and logs the string :type fraction: float :param fraction: A value between 0 and 1. When this fraction of the total jobs has completed since the last log update, a new update will be triggered. :type interval: int :param interval: A new update will be triggered when this number of seconds have passed since the last log update :param bool add_subjob_files: Add subjob files to the backend when a subjob completes (if a backend exists) :note: fraction and interval are not obeyed strictly as updates can only be triggered when a job completes """ if add_subjob_files: # Delayed import to avoid circular imports from schrodinger.application.matsci import jobutils backend = jobcontrol.get_backend() else: backend = None # The queue may have already run some of its jobs already_done = jobq.total_finished + jobq.total_failed num_jobs = float(jobq.total_added - already_done) logfn('There are %d jobs to run' % num_jobs) num_completed = 0 last_log_fraction = 0 last_log_time = time.time() done_states = {queue.JobState.DONE, queue.JobState.FAILED} def on_job_status_change(job): """ Process status changes for a JobDJ job :param job: JobDJ's snapshot of job state upon going through a status change :type job: schrodinger.job.queue.BaseJob """ nonlocal num_completed nonlocal last_log_fraction nonlocal last_log_time if job.state in done_states: num_completed += 1 if backend: jobutils.add_subjob_files_to_backend(job, backend=backend, path=jobutils.FROM_SUBJOB) current_fraction = num_completed / num_jobs fraction_elapsed = current_fraction - last_log_fraction current_time = time.time() time_elapsed = current_time - last_log_time # Log status if more than fraction of jobs have completed or more than # inverval seconds have passed since we last did a status log if fraction_elapsed >= fraction or time_elapsed >= interval: percent = int(100 * current_fraction) logfn('%d%% of jobs have completed' % percent, timestamp=True) last_log_fraction = current_fraction last_log_time = current_time jobq.run(status_change_callback=on_job_status_change)
[docs]def create_logger(logfilename=None, extension=DRIVER_LOG_EXT, related_filename=None, verbose=False, width=DEFAULT_LOG_LINE_WIDTH): """ Create a logger that can be used for logging information to a file If running under job control, the log file this logger writes to will be added to the backend as a Log File. :type logfilename: str :param logfilename: The name of the log file the logger should write to. If not supplied, the name will be based off the Job name if running under job control or based off of the basename of a related_file if not. As a last resort, the logfile name will use 'logfile' as the base name. :type extension: str :param extension: The extension to add to the file name if logfilename is not supplied. :type related_filename: str :param related_filanem: The basename of this filename will be used as the base of the log file name IF logfilename is not supplied directly and we are not running under job control. :type verbose: bool, or None :param verbose: Whether to enable verbose logging. If None, the verbosity is automatically setup according to debugging mode. :type width: int :param width: Text width of the file :rtype: (`logging.Logger`, str) :return: The logger and the name of the file it logs to. """ if verbose is None: verbose = schrodinger.in_dev_env() backend = jobcontrol.get_backend() if not logfilename: if backend: job = backend.getJob() basename = job.Name elif related_filename: basename = fileutils.splitext(related_filename)[0] else: basename = 'logfile' logfilename = basename + extension if backend: backend.addLogFile(logfilename) loggerobj = GetLogger(logfilename, verbose, width) return loggerobj.logger, logfilename
[docs]@contextlib.contextmanager def driver_logging(related_name=None, options=None, log_time=True): """ Setup logging for the driver and log initial data and finish and run times :param str related_name: If not running under job control, the basename of this name will be used for the log file. :param Namespace options: The options to log :param bool log_time: Whether finish time and duration should be logged """ logger, _ = create_logger(related_filename=related_name) if options: log_initial_data(logger, options) if debug: logger.setLevel(logging.DEBUG) start_time = time.time() try: yield logger if log_time: log_msg('Finished', logger=logger, timestamp=True, pad=True) total_time = time.time() - start_time time_str = 'Total time: ' + pretty_time_delta(total_time) log_msg(time_str, logger=logger) finally: close_logger_file_handlers(logger)
[docs]def pretty_time_delta(seconds): """ Turn seconds into 1d 2h 3m 4s format :param float seconds: The number of seconds to convert :return str: The time as string """ seconds = int(seconds) days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) if days > 0: return '%dd %dh %dm %ds' % (days, hours, minutes, seconds) elif hours > 0: return '%dh %dm %ds' % (hours, minutes, seconds) elif minutes > 0: return '%dm %ds' % (minutes, seconds) else: return '%ds' % (seconds)
[docs]def get_param_string(identifier, value, msgwidth): """ Return a formatted line with identifier on the left and value right-justified. A line of '...' will consume the intervening space. :type identifier: str :param identifier: description of the value :type value: any :param value: the value of the descriptor :type msgwidth: int :param msgwidth: the length of the msg :rtype: str :return: msg, formatted msg """ uvalue = str(value) takenup = len(identifier) + len(uvalue) + 2 separatorlen = msgwidth - takenup msg = ' '.join([identifier, separatorlen * '.', uvalue]) return msg
[docs]class GetLogger(object): """ Set up a textwrapped log file. """
[docs] def __init__(self, logfilename, verbose, msgwidth): """ Create a GetLogger instance and process it. :type logfilename: str :param logfilename: name of the log file :type verbose: boolean :param verbose: enable verbose logging :type msgwidth: int :param msgwidth: message width """ self.logfilename = logfilename self.verbose = verbose self.msgwidth = msgwidth self.setItUp()
[docs] class MyFormatter(logging.Formatter): """ Extend logging.Formatter to customize the formats of different log levels and to textwrap the messages. """ FORMATS = { logging.DEBUG: '%(message)s', logging.INFO: '%(message)s', 'DEFAULT': '%(levelname)s (from function %(funcName)s):' \ '\n\n%(message)s\n' }
[docs] def __init__(self, msgwidth): logging.Formatter.__init__(self) self.msgwidth = msgwidth
# message strings passed in may have been defined using triple # quotes and so we first need to reformat the message string. # Define formatting depending on record.levelno.
[docs] def format(self, record): msglist = record.getMessage().splitlines() while '' in msglist: msglist.remove('') msglist = [line.strip() for line in msglist] newmsg = ' '.join(msglist) indent = ' ' * 4 if record.levelno == logging.INFO or \ record.levelno == logging.DEBUG: indent = '' msgwrap = textwrap.TextWrapper(width=self.msgwidth, initial_indent=indent, subsequent_indent=indent) record.message = msgwrap.fill(newmsg) fmt = self.FORMATS.get(record.levelno, self.FORMATS['DEFAULT']) return fmt % record.__dict__
[docs] def setItUp(self): """ Set up the logger. """ # use logging.DEBUG category to log verbose printing options and # use custom formatting. logger = logging.getLogger(self.logfilename) logger.setLevel(logging.INFO) if self.verbose: logger.setLevel(logging.DEBUG) handler = logging.FileHandler(self.logfilename, mode='w') handler.setFormatter(self.MyFormatter(self.msgwidth)) logger.addHandler(handler) self.logger = logger
[docs]def decode_base64_cmd(value): """ Extract a BASE64 encoded command line from the full command line in value and return it as plain text string :type value: str :param value: A full job control command line containing a -cmd BASE64 etc. encoded command line string :rtype: str, str :return: The second value is the raw decoded BASE64 value and the first value is the raw value modified to be similar to what a user would type to on the command line (double quotes, redirects, parens, "python" removed, $SCHRODINGER/run added) """ value = value.replace('"', "").replace("'", "") tokens = value.split() try: cmd_index = tokens.index('-cmd') except ValueError: raise ValueError('No -cmd flag found in value') if tokens[cmd_index + 1] != 'BASE64': raise ValueError('Command does not appear to be BASE64 encoded') bstring = tokens[cmd_index + 2] raw_command = base64.b64decode(bstring).decode() # Now turn this into something the user would actually use mod_command = raw_command.replace('"', "") mod_tokens = mod_command.split() # Remove the redirect try: mod_tokens = mod_tokens[:mod_tokens.index('>')] except ValueError: pass # Remove the surrounding '()' mod_tokens = [x for x in mod_tokens if x not in ['(', ')']] for badstart in ['python', '%SCHRODINGER%/run']: if mod_tokens[0] == badstart: mod_tokens = mod_tokens[1:] mod_tokens.insert(0, '$SCHRODINGER/run') # Create the final command final_cmd = ' '.join(mod_tokens) return final_cmd, raw_command
[docs]def close_logger_file_handlers(logger): """ Close all logging.FileHandler's for the given logger. :type logger: `logging.Logger` :param logger: the logger object """ for handler in logger.handlers[:]: if isinstance(handler, logging.FileHandler): handler.close() # Important for Windows logger.removeHandler(handler)
[docs]def create_debug_logger(name, formatter=None, add_to_backend=None, stream=True): """ Create a logger for module level debug usage. :param name: the logger name :type name: str :param formatter: the logging format :type formatter: 'logging.Formatter' :param add_to_backend: if True and logfile is created, add it to the backend. If None, add the file to backend in debug environment. :type add_to_backend: bool or None :param stream: if True and the newly created logfile is added to the backend, stream the file to the submission host :type stream: bool :return: The logger created for this function :rtype: `logging.Logger` """ # Delayed import to avoid circular imports from schrodinger.application.matsci import jobutils logger = logging.getLogger(name) is_debug = schrodinger.in_dev_env() effective_level = logging.DEBUG if bool(is_debug) else logging.INFO logger.setLevel(effective_level) if logger.handlers: return logger logfile = name + '.log' try: fileutils.force_remove(logfile) except PermissionError: # PermissionError: [WinError 32] The process cannot access the file # because it is being used by another process: pass backend = jobcontrol.get_backend() if backend: if add_to_backend is None: add_to_backend = is_debug if add_to_backend: jobutils.add_outfile_to_backend(logfile, backend, stream=stream) logger_handle = logging.FileHandler(logfile) if not formatter: formatter = logging.Formatter("%(levelname)s: %(message)s") logger_handle.setFormatter(formatter) logger.addHandler(logger_handle) return logger