Source code for schrodinger.application.vss.tasks

import contextlib
import itertools
import os
import pprint
import shlex
import subprocess
from collections import namedtuple

import jinja2

from schrodinger.application.vss import csvsmiles
from schrodinger.application.vss import database
from schrodinger.application.vss import pilot
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import fileutils
from schrodinger.utils import log

JobAndOutcomes = namedtuple('JobAndOutcomes', ['job', 'outcomes'])
JobAndOutcomes.__doc__ += ': job handle & output filenames'
JobAndOutcomes.job.__doc__ = 'job handle'
JobAndOutcomes.outcomes.__doc__ = 'notable output filenames'

logger = log.get_output_logger('vss')


[docs]def get_jobname(ctrl, default='vs'): return jobcontrol.get_jobname() or ctrl.jobname or default
[docs]def make_job(cmd): logger.info('job: %s', ' '.join(map(shlex.quote, cmd))) return queue.JobControlJob(command=cmd, command_dir=os.getcwd())
[docs]def make_dise_job(dise_ctrl, *, infile, n_total): ''' :param dise_ctrl: DiSE parameters. :type dise_ctrl: `schrodinger.application.vss.control.DiseControl` :param n_total: Total number of ordered compounds to be DiSE-ed. :type n_total: int :param infile: Ordered compounds. :type infile: str :return: DiSE job and output file name. :rtype: (`schrodinger.job.queue.JobControlJob`, str) ''' jobname = fileutils.get_jobname(infile) root, ext = fileutils.splitext(infile) outfile = f'{jobname}-dise_cpds{ext}' cmd = [ 'run', 'dise_select.py', infile, '-JOBNAME', jobname, '-sim_threshold', str(dise_ctrl.similarity), '-seed_cpds', str(int(dise_ctrl.seed * n_total + 0.5)), '-num_cpds', str(n_total) ] # yapf: disable return (make_job(cmd), outfile)
[docs]def make_glide_jobs(cfg, ctrl): ''' Run glide for actives/decoys only (not "databases"). :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` :return: Glide job and output file names or None. :rtype: `JobAndOutcomes` or NoneType ''' sources = [s for s in [ctrl.actives, ctrl.decoys] if s] if not ctrl.glide or not sources: return None jobname = f'{get_jobname(ctrl)}_glide' ligands = f'{jobname}_ligands.csv' pilot.extract_smiles_and_cids(sources, ligands) glide_inp = f'{jobname}.inp' with open(glide_inp, 'w') as fp: params = {'gridfile': ctrl.glide.grid, 'ligandfile': ligands} fp.write(jinja2.Template(cfg.glide_inp).render(params)) cmd = ['glide', glide_inp] if cfg.glide_host: cmd += ['-HOST', cfg.glide_host] if cfg.glide_driver_host: cmd += ['-DRIVERHOST', cfg.glide_driver_host] return JobAndOutcomes(make_job(cmd), [f'{jobname}_pv.maegz'])
[docs]def make_glide_al_jobs(cfg, ctrl): ''' :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` :return: Glide AL job and output file names or None. :rtype: `JobAndOutcomes` or NoneType ''' if not ctrl.glide_al: return None smiles = list(cfg.get_smiles_csv(ctrl.databases)) if not smiles: return None jobname = f'{get_jobname(ctrl)}_glide_al' cpds_list_file = f'{jobname}_cpds' with open(cpds_list_file, 'w') as fp: fp.write('\n'.join(smiles)) cmd = [ 'run', '-FROM', 'glide', 'glide_active_learning.py', 'screen', '-grid', jobcontrol.get_runtime_path(ctrl.glide_al.grid), '-infile_list_file', cpds_list_file, '-smi_index', '1', '-name_index', '2', '-train_time', str(ctrl.glide_al.training_time), '-train_size', str(cfg.glide_al_train_size), '-num_iter', str(cfg.glide_al_num_iter), '-keep', str(ctrl.glide_al.n_report), '-jobname', jobname ] # yapf: disable if cfg.ligprep_args is not None: cmd += ['-ligprep_args', ' '.join(cfg.ligprep_args)] if cfg.glide_al_njobs: cmd += ['-num_train_core', str(cfg.glide_al_njobs)] if cfg.glide_al_glide_subjob_size: cmd += ['-glide_subjob_size', str(cfg.glide_al_glide_subjob_size)] if cfg.glide_al_eval_host: cmd += ['-HOST', cfg.glide_al_eval_host] if cfg.glide_al_driver_host: cmd += ['-DRIVERHOST', cfg.glide_al_driver_host] if cfg.glide_al_train_host: cmd += ['-train_host', cfg.glide_al_train_host] if cfg.glide_al_num_rescore_ligand: cmd += ['-num_rescore_ligand', str(cfg.glide_al_num_rescore_ligand)] if cfg.glide_al_block_size: cmd += ['-block_size', str(cfg.glide_al_block_size)] glide_al_outfile = f'{jobname}_pred_iter_{cfg.glide_al_num_iter}.csv' glide_al_job = make_job(cmd) if ctrl.dise: dise_job, dise_outfile = make_dise_job(ctrl.dise, infile=glide_al_outfile, n_total=ctrl.glide_al.n_report) dise_job.addPrereq(glide_al_job) return JobAndOutcomes(dise_job, [dise_outfile]) else: return JobAndOutcomes(glide_al_job, [glide_al_outfile])
[docs]def get_ligprep_command(cfg, infile, outfile, jobname=None): cmd = ['ligprep', '-ismi', infile, '-omae', outfile, '-NJOBS', '1'] if cfg.ligprep_args is not None: cmd += cfg.ligprep_args if jobname: cmd += ['-JOBNAME', jobname] return cmd
[docs]def get_shape_query(cfg, ctrl): ''' :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` :return: Shape query filename (maestro) and job handle. :rtype: (str, `schrodinger.job.queue.JobControlJob` or NoneType) ''' if not ctrl.shape: return (None, None) jobname = f'{get_jobname(ctrl)}_shape_query' if ctrl.shape.query: query = jobcontrol.get_runtime_path(ctrl.shape.query) query_format = fileutils.get_structure_file_format(query) if query_format in (fileutils.MAESTRO, fileutils.SD): return (query, None) elif query_format in (fileutils.SMILES, fileutils.SMILESCSV): query_lp = f'{jobname}.maegz' lp_cmd = get_ligprep_command(cfg, query, query_lp, jobname=jobname) query_job = make_job(lp_cmd) return (query_lp, query_job) else: raise ValueError(f'unsupported shape query format: {query_format}') else: actives = jobcontrol.get_runtime_path(ctrl.actives.filename) actives_mae = f'{jobname}_actives.maegz' lp_cmd = get_ligprep_command(cfg, actives, actives_mae, jobname=f'{jobname}_actives') lp_job = make_job(lp_cmd) query_cmd = [ 'run', 'shape_pick_diverse.py', '-shape_type', ctrl.shape.shape_type, actives_mae, '-JOBNAME', f'{jobname}_pick' ] if cfg.shape_host: query_cmd += ['-HOST', cfg.shape_host] query_job = make_job(query_cmd) query_job.addPrereq(lp_job) query = f'{jobname}_pick-out.maegz' return (query, query_job)
[docs]def make_shape_jobs(cfg, ctrl): ''' :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` :return: Shape job and output file names or None. :rtype: `JobAndOutcomes` or NoneType ''' if not ctrl.shape: return None shape_data, shape_data_is_local = cfg.get_shape_data( ctrl.databases, shape_type=ctrl.shape.shape_type) if not shape_data: return None jobname = f'{get_jobname(ctrl)}_shape' shape_data_treatment = 'copy' if shape_data_is_local else 'remote' query, query_job = get_shape_query(cfg, ctrl) outfiles = [query] if query_job else [] shape_data_list_file = f'{jobname}_shape_data_list' with open(shape_data_list_file, 'w') as fp: fp.write('\n'.join(shape_data)) cmd = [ 'shape_screen_gpu', 'run', '-shape', query, '-screen_list', shape_data_list_file, '-shape_data_treatment', shape_data_treatment, '-keep', str(ctrl.shape.n_report), '-JOBNAME', jobname ] # yapf: disable if cfg.shape_host: cmd += ['-HOST', cfg.shape_host] if cfg.shape_driver_host: cmd += ['-DRIVERHOST', cfg.shape_driver_host] shape_job = make_job(cmd) if query_job: shape_job.addPrereq(query_job) ctrl.shape.query_is_synthetic = True shape_outfile = f'{jobname}-out.maegz' if ctrl.dise: dise_job, dise_outfile = make_dise_job(ctrl.dise, infile=shape_outfile, n_total=ctrl.shape.n_report) dise_job.addPrereq(shape_job) return JobAndOutcomes(dise_job, outfiles + [dise_outfile]) else: return JobAndOutcomes(shape_job, outfiles + [shape_outfile])
[docs]def make_ligand_ml_training_set(cfg, ctrl): ''' Concatenates actives and decoys into single file ("training set" to be used for Ligand ML). :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` :return: CSV SMILES file with "truth" column. :rtype: `csvsmiles.CsvSmilesFile` ''' outcome = csvsmiles.CsvSmilesFile(filename=get_new_filename( f'{get_jobname(ctrl)}_training_set', '.csv'), cid_col=database.CID_COL, truth_col=database.TRUTH_COL, smiles_col=database.SMILES_COL) with contextlib.ExitStack() as stack: writer = outcome.get_dict_writer(stack, fieldnames=[ database.SMILES_COL, database.CID_COL, database.TRUTH_COL ]) writer.writeheader() for (src, t) in zip([ctrl.actives, ctrl.decoys], [1, 0]): for row in src.get_dict_reader(stack): writer.writerow({ database.SMILES_COL: row[src.smiles_col], database.CID_COL: row[src.cid_col], database.TRUTH_COL: t }) return outcome
[docs]def make_ligand_ml_jobs(cfg, ctrl): ''' :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` :return: Ligand ML job and output file names or None. :rtype: `JobAndOutcomes` or NoneType ''' if not ctrl.ligand_ml: return None smiles = list(cfg.get_smiles_csv(ctrl.databases)) if not smiles: return None jobname = f'{get_jobname(ctrl)}_ligand_ml' num_subjobs = cfg.ligand_ml_njobs or 1 model = f'{jobname}.qzip' base_cmd = [ 'run', 'ligand_ml_driver.py', model ] # yapf: disable training_set = make_ligand_ml_training_set(cfg, ctrl) build_cmd = base_cmd + [ '-build', '-i', training_set.filename, '-time', str(ctrl.ligand_ml.training_time), '-y', training_set.truth_col, '-smiles_col', training_set.smiles_col, '-classification', '-NUM_SUBJOBS', str(num_subjobs) ] # yapf: disable if cfg.ligand_ml_train_host: build_cmd += ['-HOST', cfg.ligand_ml_train_host] if cfg.ligand_ml_driver_host: build_cmd += ['-DRIVERHOST', cfg.ligand_ml_driver_host] build_job = make_job(build_cmd) # placeholder for now: # need to either merge all databases (not ideal), # or run multiple predictions and merge (needs another job-controllable # script in current paradigm) smiles_files = list(cfg.get_smiles_csv(ctrl.databases)) if not smiles_files: return None jobname = f'{get_jobname(ctrl)}_ligand_ml' outfile = f'{jobname}_pred.csv' cpds_list_file = f'{jobname}_cpds' with open(cpds_list_file, 'w') as fp: fp.write('\n'.join(smiles_files)) predict_cmd = [ 'run', '-FROM', 'glide', 'glide_active_learning.py', 'evaluate', '-model', model, '-infile_list_file', cpds_list_file, '-name_index', '2', '-smi_index', '1', '-num_rescore_ligand', '0' ] if cfg.ligand_ml_eval_host: predict_cmd += ['-HOST', cfg.ligand_ml_eval_host] if cfg.ligand_ml_driver_host: predict_cmd += ['-DRIVERHOST', cfg.ligand_ml_driver_host] if cfg.glide_al_block_size: predict_cmd += ['-block_size', str(cfg.glide_al_block_size)] predict_job = make_job(predict_cmd) predict_job.addPrereq(build_job) # skip DiSE for now: need to "glide_sort" the prediction outcomes # by ranking_score = score - 1.96*std # (another job-controllable script? or modify ligand_ml_driver.py?) return JobAndOutcomes(predict_job, [outfile])
[docs]def get_new_filename(root, ext): ''' Returns either `root + ext` (if no such file exists) or a non-existing variation of it. ''' fn = root + ext n = 0 while os.path.exists(fn): n = n + 1 fn = f'{root}_{n}{ext}' return fn
[docs]def ensure_decoys(cfg, ctrl): ''' Sample decoys from a pool if not provided via control. :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` ''' logger.info('actives:\n%s', pprint.pformat(ctrl.actives.to_dict())) valid, msg = ctrl.actives.validate() if not valid: raise RuntimeError(msg) if ctrl.decoys: valid, msg = ctrl.decoys.validate() if not valid: raise RuntimeError(msg) ctrl.synthetic_decoys = False else: logger.info('sampling decoys ...') # column names from the property_matched_decoys.py ctrl.decoys = csvsmiles.CsvSmilesFile(filename=os.path.abspath( get_new_filename('decoys', '.csv.gz')), smiles_col='SMILES', cid_col='CID') subprocess.run([ os.path.join(os.getenv('SCHRODINGER'), 'run'), 'property_matched_decoys.py', 'sample', '-smiles_col', ctrl.actives.smiles_col, '-n', '100', '-outfile', ctrl.decoys.filename, ctrl.actives.filename, '-pool', cfg.decoys_pool ], check=True) backend = jobcontrol.get_backend() if backend: backend.addOutputFile(ctrl.decoys.filename) ctrl.synthetic_decoys = True logger.info('decoys:\n%s', pprint.pformat(ctrl.decoys.to_dict()))
[docs]def run_production(cfg, ctrl): ''' Executes "virtual screening" tasks. :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` ''' jobname = get_jobname(ctrl) logger.info('jobname: %s', jobname) for dbname in ctrl.databases: logger.info('database: %s', dbname) if ctrl.glide or ctrl.glide_al or ctrl.ligand_ml: ensure_decoys(cfg, ctrl) tasks = { 'glide': make_glide_jobs(cfg, ctrl), 'glide_al': make_glide_al_jobs(cfg, ctrl), 'shape': make_shape_jobs(cfg, ctrl), 'ligand_ml': make_ligand_ml_jobs(cfg, ctrl), } jobdj = queue.JobDJ(verbosity='verbose') for t in tasks.values(): if t: jobdj.addJob(t.job) if jobdj.total_added: jobdj.run() backend = jobcontrol.get_backend() for fn in itertools.chain.from_iterable( t.outcomes for t in tasks.values() if t): logger.info('outcome: %s', fn) if backend: backend.addOutputFile(fn)
[docs]def run_pilot(cfg, ctrl): ''' Executes "virtual screening" pilot. :param cfg: VSS runner configuration. :type cfg: `schrodinger.application.vss.config.Config` :param ctrl: Run specification. :type ctrl: `schrodinger.application.vss.control.RunnerControl` ''' jobname = get_jobname(ctrl, 'vs-pilot') logger.info('running "pilot"') logger.info('jobname: %s', jobname) ensure_decoys(cfg, ctrl) logger.info('making database ...') dbname = get_new_filename(f'{jobname}_db', '') shape_type = ctrl.shape.shape_type if ctrl.shape else None pilot.make_database(dbname, [ctrl.actives, ctrl.decoys], shape_type) ctrl.jobname = jobname ctrl.databases = [dbname] ctrl.dise = None ctrl.glide_al = None ctrl.ligand_ml = None cfg.databases = [database.Database(dbname)] run_production(cfg, ctrl)
# TODO: enrichments => what about "n_report"?