Source code for schrodinger.stepper.app

"""
This module provides infrastructure for expanding a stepper
workflow into an application for sharing and deploying with customers.

To use, subclass StepperApplication and implement the abstract methods (see
the docstring for StepperApplication for more info). By using StepperApplication,
you'll be implementing an interface with other stepper applications making
deployment and use of your stepper workflows much easier.

========
GLOSSARY
========

    Constants dict: These are dictionaries providing values that are constant
        within a deployment. These often define static/shared resources that
        are available to all compute nodes on a cluster.

    User dict: Dictionaries defining user inputs and settings for a particular
        workflow run. These often include parameters for how to run a workflow
        along with input files.

    Configuration dict: Configuration dicts are the the combination of a user
        dict with a constants dict. They should define all information necessary
        to run the stepper application.
"""
from ruamel import yaml
import sys
import argparse
from typing import Iterable

from schrodinger.job import launchapi
from schrodinger.job import jobcontrol


def _get_schrodinger_product():
    if '-FROM' in sys.argv:
        return sys.argv[sys.argv.index('-FROM') + 1]
    return None


[docs]class StepperApplication: """ Base class for all stepper applications. To use, subclasses *must* implement the following abstract methods: - runWorkflow - deploymentCheck - setUpTestUserDict - writeConfiguration The following methods are optional to implement but are highly recommended: - validate - getLocalInputFiles - getLocalInputFolders See the docstrings of the individual methods to see what is expected for each. Note that the docstrings for the abstract methods will be used as the help message for their associated subcommands (run/deployment_check/test/write_config) After subclassing, you can expose your StepperApplication cmdline interface by creating a python file under `$SCHRODINGER/python/scripts` and setting `get_job_spec_from_args` and calling your app's `main` method. This is better described through example:: # my_stepper_app.py class MyApp(StepperApplication): ... # Implement abstract methods here. get_job_spec_from_args = MyApp.get_job_spec_from_args if __name__ == '__main__': MyApp.main() """
[docs] @classmethod def validate(cls, config_dict: dict): """ Validate that the `config_dict` is properly configured. This is where subclasses will construct a stepper workflow and call `validateSettings()`. If there are any issues with the configuration, it's expected that this method will raise an exception. When the application is run as a job, validation will be run on the job-launching machine. The job itself will skip validation by default. If an application would like to validate during job execution, it's free to call this method within `runWorkflow`. NOTE:: This is a good place to call `my_workflow.report()` so users can see the topology of the workflow they will run. :param config_dict: The configuration dictionary to run the application. See the module glossary for more info. """ pass
[docs] @classmethod def runWorkflow(cls, config_dict: dict): """ This method does the actual running of the stepper workflow(s) associated with this stepper application. Often times the workflows will generate output files that users are interested in. To add output files or folders to be brought back from a job run, use `addOutputFile` and `addOutputFolder`. :param config_dict: The configuration dictionary used for setting up and running the workflow. Usually includes settings and inputs. """ raise NotImplementedError
[docs] @classmethod def deploymentCheck(cls, constants_dict: dict): """ This method checks that a particular deployment of this application is set up correctly. It's expected that SAs will run this method on new deployments before beginning to run small tests. Some potentially useful checks that can go here: - Confirm all static files noted in `constants_dict` exist - Confirm the license capacity of the license server - Confirm cloud service (aws/gcp) credentials are set up correctly :param constants_dict: Dictionary providing values that are constant within a deployment. These often define static/shared resources that are available to all compute nodes on a cluster. """ raise NotImplementedError
[docs] @classmethod def setUpTestUserDict(cls, constants_dict: dict, large=False): """ This method is used to create a user_dict for starting a test run of the application. The user dict will be used in conjunction with `constants_dict` to create a configuration to start a test run. Implementations of this method should be able to create both a user dict for both small and large runs. Small runs should ideally run in <30m but should still exercise as much functionality as possible of the workflow(s). :param constants_dict: Dictionary providing values that are constant within a deployment. These often define static/shared resources that are available to all compute nodes on a cluster. :param config_fname: Where to write the configuration dict to. """ raise NotImplementedError
[docs] @classmethod def writeConfiguration(cls, user_dict: dict, constants_yaml: str, config_fname: str): """ Given a user_dict and constants_dict, implementations of this method should write out a fully-fledged configuration file at `config_fname`. :param user_dict: Dictionary defining user inputs and settings for a particular workflow run. :param constants_yaml: Filepath to yaml file providing values that are constant within a deployment. These often define static/shared resources that are available to all compute nodes on a cluster. :param config_fname: Where to write the configuration dict to. """ raise NotImplementedError
[docs] @classmethod def getLocalInputFiles(cls, config_dict: dict) -> Iterable[str]: """ Given a configuration dict, return the local input files required for workflow execution. NOTE:: Static files should not be returned here. :param config_dict: The configuration dictionary used for setting up and running the workflow. Usually includes settings and inputs. :type config_dict: dict """ return []
[docs] @classmethod def getLocalInputFolders(cls, config_dict: dict) -> Iterable[str]: """ Given a configuration dict, return the input folders required for workflow execution. NOTE:: Static folders should not be returned here. :param config_dict: The configuration dictionary used for setting up and running the workflow. Usually includes settings and inputs. :type config_dict: dict """ return []
[docs] @classmethod def get_job_spec_from_args(cls, argv): if jsb := cls.get_job_spec_builder_from_args(argv): return jsb.getJobSpec()
[docs] @classmethod def get_job_spec_builder_from_args(cls, argv): """ Implements the LaunchAPI method get_job_spec_from_args. Sets up a job to run on a host. If the user specifies `--dry-run` however, validation will be run instead and no job will be submitted. :returns: JobSpecificationArgsBuilder or None """ args = cls.parseArgs(argv[1:]) if args.subcmd == 'test': new_subcmd = cls._setUpRemoteTest(args) argv[1:] = new_subcmd args = cls.parseArgs(argv[1:]) if args.subcmd == 'run' and not args.skip_validation: config_dict = load_yaml(args.config_yaml) cls.validate(config_dict) if args.dry_run: return argv.append('--skip-validation') args = cls.parseArgs(argv[1:]) inp_files = [] inp_dirs = [] if getattr(args, 'config_yaml', None): inp_files.append(args.config_yaml) config_dict = load_yaml(args.config_yaml) inp_files.extend(cls.getLocalInputFiles(config_dict)) inp_dirs.extend(cls.getLocalInputFolders(config_dict)) schrodinger_product = _get_schrodinger_product() jsb = launchapi.JobSpecificationArgsBuilder( argv, schrodinger_product=schrodinger_product, use_jobname_log=True) jobname = jsb.getJobname() if not jobname: jsb.setJobname(cls.__name__) if getattr(args, 'constants_yaml', None): inp_files.append(args.constants_yaml) for fname in inp_files: jsb.setInputFile(fname) for dir_ in inp_dirs: jsb.setInputDirectory(dir_) return jsb
[docs] @classmethod def main(cls, args=None): args = cls.parseArgs(args=args) args.func(args)
[docs] @classmethod def parseArgs(cls, args): parser = cls._getParser() if args is None: args = sys.argv[1:] if args and args[0] not in [ 'run', 'run_deployment_checks', 'test', 'write_config' ]: return parser.parse_args(args=['run'] + args) return parser.parse_args(args=args)
[docs] @classmethod def registerOutputFile(cls, fname): if backend := jobcontrol.get_backend(): backend.addOutputFile(fname)
[docs] @classmethod def registerOutputFolder(cls, dir_): if backend := jobcontrol.get_backend(): backend.addOutputFile(dir_)
@classmethod def _getParser(cls): parser = argparse.ArgumentParser( description=cls.__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) subparsers = parser.add_subparsers(dest='subcmd') subparser_run = subparsers.add_parser( 'run', formatter_class=argparse.RawDescriptionHelpFormatter) subparser_run.add_argument('config_yaml', type=str) grp = subparser_run.add_mutually_exclusive_group() grp.add_argument('--dry-run', action='store_true') grp.add_argument('--skip-validation', action='store_true') subparser_run.set_defaults(func=cls._invokeRunWorkflow) subparser_write_config = subparsers.add_parser( 'write_config', description=cls.writeConfiguration.__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) subparser_write_config.add_argument('user_yaml', type=str) subparser_write_config.add_argument('constants_yaml', type=str) subparser_write_config.add_argument('config_yaml', type=str) subparser_write_config.set_defaults(func=cls._invokeWriteConfig) subparser_run_deployment_checks = subparsers.add_parser( 'run_deployment_checks', description=cls.deploymentCheck.__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) subparser_run_deployment_checks.add_argument('constants_yaml', type=str) subparser_run_deployment_checks.set_defaults( func=cls._invokeDeploymentCheck) subparser_test = subparsers.add_parser( 'test', description=cls.setUpTestUserDict.__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) subparser_test.add_argument('constants_yaml', type=str) subparser_test.add_argument('--large', action='store_true', default=False) subparser_test.set_defaults(func=cls._invokeTest) return parser @classmethod def _invokeWriteConfig(cls, args): user_dict = load_yaml(args.user_yaml) cls.writeConfiguration(user_dict, args.constants_yaml, args.config_yaml) @classmethod def _invokeDeploymentCheck(cls, args): constants_dict = load_yaml(args.constants_yaml) cls.deploymentCheck(constants_dict) @classmethod def _invokeTest(cls, args): constants_dict = load_yaml(args.constants_yaml) user_dict = cls.setUpTestUserDict(constants_dict, args.large) cls.writeConfiguration(user_dict, args.constants_yaml, 'config.yaml') config_dict = load_yaml('config.yaml') cls.validate(config_dict) cls.runWorkflow(config_dict) @classmethod def _setUpRemoteTest(cls, args): user_dict = cls.setUpTestUserDict(args.constants_yaml, args.large) config_fname = 'test_config.yaml' cls.writeConfiguration(user_dict, args.constants_yaml, config_fname) return f'run {config_fname}'.split() @classmethod def _invokeRunWorkflow(cls, args): config_dict = load_yaml(args.config_yaml) if not args.skip_validation: cls.validate(config_dict) if not args.dry_run: cls.runWorkflow(config_dict)
[docs]def load_yaml(filename): with open(filename) as fh: return yaml.load(fh.read(), Loader=yaml.RoundTripLoader)
[docs]def write_yaml(config_dict, filename): with open(filename, 'w') as outfile: return yaml.dump(config_dict, outfile, Dumper=yaml.RoundTripDumper)