Source code for schrodinger.pipeline.stages.phase

# -*- coding: utf-8 -*-
"""
Stages for running Phase jobs.

Copyright Schrodinger, LLC. All rights reserved.

"""

import os

import schrodinger.utils.subprocess as subprocess
from schrodinger import structure
from schrodinger.job import jobcontrol
from schrodinger.job import util as job_util
from schrodinger.pipeline import pipeio
from schrodinger.pipeline import pipeutils
from schrodinger.pipeline import stage
from schrodinger.utils import fileutils

PHASE_DATABASE_EXE = os.path.join(os.environ['SCHRODINGER'], 'phase_database')
phase_shape = os.path.join(os.environ["SCHRODINGER"], "shape_screen")

MMSHARE_DATA_DIR = job_util.hunt('mmshare', dir='data')
ALT_FILES = [os.path.join(MMSHARE_DATA_DIR, 'proj_AD_feature.ini')]


[docs]def check_subset_existence(dbpath, subsetname): """ If the subset exists, returns a full subset path (absolute file path w/o the "_phase.inp" extension). If it does not exist, raises RuntimeError. """ if dbpath.endswith(".phdb"): # New format dbdir = dbpath else: # Old foramt dbdir = os.path.split(dbpath)[0] subset_file = os.path.join(dbdir, "%s_phase.inp" % subsetname) if not os.path.isfile(subset_file): msg = "Subset file for subset '%s' does not exist. Expected '%s'" % ( subsetname, subset_file) raise RuntimeError(msg) return os.path.join(dbdir, subsetname)
[docs]def run_phase_database_job(cmd, stage): """ Run the given phase_database job. On failure, exits the stage. """ print('RUNNING:', subprocess.list2cmdline(cmd)) job = jobcontrol.launch_job(cmd) stage.info(" JobId: %s" % job.job_id) exe = os.path.basename(cmd[0]) stage.info('Waiting for %s job to complete...' % exe) job.wait() jobname = job.Name logfile = jobname + ".log" ok_file = jobname + ".okay" err_file = jobname + "_errors.out" if not job.succeeded() or not os.path.isfile(ok_file): stage.error("ERROR %s job failed to run" % exe) # Will print NO LOG FILE if log file is not present: print(pipeutils.get_last_20_lines(logfile)) stage.exit()
[docs]def extract_properties(db, stage, jobname): """ Run phase_database extract in CWD for the specified database. """ logfile = jobname + ".log" cmd = [PHASE_DATABASE_EXE, db, "extract", jobname, '-map'] cmd.append("-append") # Generate properties only for the NEW structures # Run on remote host, because database may not be accessible on localhost. host = stage.getHostStr() cmd += ['-HOST', host] stage.info("\nGenerating the properties database...") run_phase_database_job(cmd, stage) # Properties database was updated successfully return
[docs]class DBManageStage(stage.Stage): """ Stage for creating a Phase database. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ DATABASE = string() # Save the database to this path (or append to this path) NEW = boolean(default=True) # Whether to create a new database or append to existing MULTIPLE_CONFS = boolean(default=False) # Whether input has multiple conformers per compound. CONSIDER_STEREO = boolean(default=False) # When importing multiple conformers, use connectivity (including stereochemistry) to percieve conformers instead of titles. GENERATE_PROPS = boolean(default=True) # Whether to run phase_database to generate the properties database CREATE_SUBSET = boolean(default=False) # Whether to create a subset "added_ligands" in the database for the newly added ligands. Works for local databases only. SKIP_DUPLICATES = boolean(default=False) # Skip duplicate structures (by SMILES) """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.status = "NOT STARTED" # Used by Pipeline to associate -host_phase with this stage: self.setMainProduct('phase') # The input #1 needs to be of type 'structures' self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, 'phasedb', True) # Output PhaseDB path
[docs] def operate(self): """ Perform an operation on the input files. There are setup, running, and post-processing steps, and the stage records its current status so that it can be restarted in that step if there is a failure. Raises a RuntimeError if the JobDJ run() method fails, or if the stage finishes with an improper status. """ # NOTE: Only new formatted databases can be appended to # Get the filenames of ligand files specified in input #1: input_io = self.getInput(1) structure_files = input_io.getFiles() inp_db = self['DATABASE'] if inp_db.endswith("_phasedb"): self.exit("Old database format is not supported") expanded_db = os.path.expandvars(inp_db) if inp_db != expanded_db: print('Expanding environment variables in:', inp_db) # The DATABASE value is typically specified WITHOUT the *.phdb # extention, for backwards-compatability reasons. if not expanded_db.endswith(".phdb"): expanded_db += ".phdb" self.db = expanded_db # Phase utilities require an absolute path. If just the name of the # database is given, then write the database to the same directory # as where the stage is run: if not os.path.isabs(self.db): # Create the database in the driver dir to avoid copying self.db = os.path.join(self._driver_dir, self.db) if self['NEW'] and os.path.isdir(self.db): msg = "Database already exists: %s" % self.db msg += "\nExiting..." self.exit(msg) print('Output database path:', self.db) # Verify file formats: for ligfile in structure_files: fileformat = fileutils.get_structure_file_format(ligfile) if fileformat not in ["maestro", "sd"]: msg = "ERROR: Unsupported ligand file format: %s" % ligfile self.exit(msg) host_str = self.getHostStr() # Determine whether to use the multi-ligand backend PANEL-1713: ncpus = self.getNCpus() # Arguments to add when creating a new (first) database: new_args = ['-new', '-alt', ','.join(ALT_FILES)] # Multi backend doesn't support -multi and -unique PHASE-1558, PHASE-1559 if ncpus >= 2 and not self['MULTIPLE_CONFS'] and not self[ 'SKIP_DUPLICATES']: # Write structure files to a list file: phase_jobname = self.stagename + '_phase' list_file = phase_jobname + '.list' with open(list_file, 'w') as fh: for ligfile in structure_files: fh.write(ligfile + '\n') cmd = [ PHASE_DATABASE_EXE, self.db, "splice", list_file, '-JOB', phase_jobname ] if self['NEW']: cmd += new_args if ncpus == len(structure_files): # Save even more time by not having phase_database splice # re-combine the files, if we are already lucky enough to # have the right number of files: cmd.append('-nosplit') cmd += ['-HOST', host_str] self.runAddJob(cmd) else: # Run a phase_database backend for each structure file: for fileindex, ligfile in enumerate(structure_files): phase_jobname = self.genFileName(filenum=fileindex + 1) cmd = [ PHASE_DATABASE_EXE, self.db, "import", phase_jobname, "-i", ligfile ] if self['NEW'] and fileindex == 0: # Create the new database only with first subjob cmd += new_args if self['MULTIPLE_CONFS']: # Treat each input strucuture as a conformer cmd.append("-multi") if self['CONSIDER_STEREO']: # Use connectivity (including stereochemistry) instead # of titles for grouping input conformers. cmd.append("-stereo") if self["SKIP_DUPLICATES"]: cmd += ['-unique'] cmd += ['-HOST', host_str] self.runAddJob(cmd) # Check for existence of the database only when running on localhost: # (because it may be that the local host has no access to that dir) # doesn't matter how many CPUs if self.areSubjobsAndStageOnSameHost(): if not os.path.isdir(self.db): self.exit( "ERROR phase_database done; yet database missing: %s" % self.db) # Create a subset for adding ligands, if requested: if self['CREATE_SUBSET']: # Ev:91949 Create a subset of ligands that were just added. # Will overwrite the previous subset file with this name. # The phase_database utility creates a file called # <jobname>_new_phase.inp for the compounds that it just added. # Copy this file into the database directory so that it shows up # as a subset in the Manage Phase DB panel. subsetname = "added_ligands" dbdir = self.db subset_file = os.path.join(dbdir, "%s_phase.inp" % subsetname) prev_subset_file = "%s_new_phase.inp" % phase_jobname if not os.path.isfile(prev_subset_file): self.error("phase_database failed to create a subset file: %s" % prev_subset_file) self.exit() # Move the subset file into the database directory: try: fileutils.force_rename(prev_subset_file, subset_file) except OSError: self.warning("CREATE_SUBSET option is not supported for " "remote databases; subset will not be created.") if self['GENERATE_PROPS']: jobname = "phase_database_extract" try: extract_properties(self.db, self) except RuntimeError as err: self.error(err) self.exit() # NOTE: Job control does not support copying directories self.setOutput(1, pipeio.PhaseDB(self.db, remote=True)) return
[docs] def runAddJob(self, cmd): """ Run a phase_database job. Will also check the log file and exit the stage if the Phase job failed. """ self.info("\nAdding structures to the database...") run_phase_database_job(cmd, self)
[docs]class DBConfSitesStage(stage.Stage): """ Stage for generating conformations for structures in a Phase database. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ SITES = boolean(default=False) # Create pharmacophore sites for existing conformers (don't generate new conformers). CONFS = option('all', 'auto', default=None) # Generate conformers for all molecules (all) or only for molecules that don't already have them (auto). MAX_CONFS = integer(default=100) MINIMIZE_FFLD = option('OPLS3e', 'OPLS_2005', default=None) # Field to use during conformer minimization (by default, no minimization is done). MAX_PER_ROT_BOND = integer(default=10) AMIDE_MODE = option('vary', 'orig', 'trans', default='trans') ENERGY_WINDOW = float(default=25.0) SUBSET = string(default=None) # Subset to run phase_database on (subset name in the database). Local databases only. Deprecated in favor of SUBSET_FILE. SUBSET_FILE = string(default=None) # Subset file to run phase_database revise task on. SKIP_COMPOUNDS_WITH_SITES = boolean(default=False) # Add sites (and optionally conformers) only to those compounds that don't already have sites. SAMPLE_METHOD = option('rapid', 'thorough', default='rapid') # Conformational sampling method. GENERATE_PROPS = boolean(default=True) # Whether to extract properties afterwards """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.status = "NOT STARTED" # Used by Pipeline to associate -host_phase with this stage: self.setMainProduct('phase') self.addExpectedInput(1, "phasedb", True)
[docs] def operate(self): """ Perform an operation on the input files. There are setup, running, and post-processing steps, and the stage records its current status so that it can be restarted in that step if there is a failure. Raises a RuntimeError if the JobDJ run() method fails, or if the stage finishes with an improper status. """ if self.status == "NOT STARTED": self.master_jobname = self.genFileName() db = self.getInput(1).getPath() # Check for existence of the database only when running on # localhost: if self.areSubjobsAndStageOnSameHost(): if not os.path.exists(db): raise RuntimeError(f'Database {db} does not exist') # Determine what subset of the database to run on (if any): subset_file = None if self['SUBSET'] and self['SUBSET_FILE']: raise RuntimeError( 'SUBSET and SUBSET_FILE keywords are mutually exclusive.') if self['SKIP_COMPOUNDS_WITH_SITES']: if self['SUBSET'] or self['SUBSET_FILE']: raise RuntimeError( 'SKIP_COMPOUNDS_WITH_SITES is not compatible with SUBSET or SUBSET_FILE keywords' ) jobname = self.master_jobname + '_subset' subset_file = self.createSubsetForCompsWithoutSites(db, jobname) elif self['SUBSET']: subsetname = self['SUBSET'] # Input is a subset name, for a subset file that is located # in the database. Works for local databases only. subset_file = os.path.join(db, "%s_phase.inp" % subsetname) elif self['SUBSET_FILE']: subset_file = self['SUBSET_FILE'] # Run the command to generate sites (and optionally conformers): maxconfs = str(self['MAX_CONFS']) maxperbond = self['MAX_PER_ROT_BOND'] if maxperbond is None: # not specified (default doesn't work) maxperbond = '10' else: maxperbond = str(maxperbond) amide = self['AMIDE_MODE'] ewin = str(self['ENERGY_WINDOW']) samplemethod = self['SAMPLE_METHOD'] # Build-up the command string: jobname = self.master_jobname + '_revise' cmd = [PHASE_DATABASE_EXE, db, "revise", jobname] if self['SITES']: self.info("Generating sites for existing conformers") cmd.append('-sites') else: self.info("Generating conformers and sites") cmd += [ '-confs', self['CONFS'], '-max', maxconfs, '-bf', maxperbond, '-amide', amide, '-ewin', ewin, '-sample', samplemethod ] if self['MINIMIZE_FFLD']: cmd += ['-force_field', self['MINIMIZE_FFLD']] cmd += ['-HOST', self.getHostStr()] if subset_file is not None: if not os.path.isfile(subset_file): raise RuntimeError("Subset file does not exist: %s" % subset_file) # We must specify the subset file without the _phase.inp # extension: cmd += ['-isub', subset_file[:-10]] # If restarting, go directly to next step. # FIXME: non-functional cmd.append('-NOJOBID') self.status = "RUNNING JOBS" self.dump() run_phase_database_job(cmd, self) if self['GENERATE_PROPS']: self.status = "GENERATING PROPS" else: self.status = "COMPLETE" self.dump() if self.status == "GENERATING PROPS": db = self.getInput(1).getPath() jobname = self.master_jobname + '_extract' try: extract_properties(db, self, jobname) except RuntimeError as err: self.error(err) self.exit() self.status = "COMPLETE" self.dump() # COMPLETE return
[docs] def createSubsetForCompsWithoutSites(self, db, jobname): """ Run a "subset" task to create a subset of compounds in the database that don't already have sites. Must run on the host that has access to the database. """ subset_file = "no_sites_phase.inp" fileutils.force_remove(subset_file) # Generate a subset file for all compounds that don't have sites. cmd = [ PHASE_DATABASE_EXE, db, 'subset', jobname, '-has', 'sites', '-false', '-osub', 'no_sites', '-HOST', self.getHostStr() ] self.info("\nCreating subset file for compounds without sites...") run_phase_database_job(cmd, self) if not os.path.isfile(subset_file): self.exit( 'ERROR: phase_database subset task failed to create a subset file: %s' % subset_file) return subset_file
[docs]class DBExportStage(stage.Stage): """ Stage for exporting structures from a Phase database. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ OUTFORMAT = string(default='maestro') # Output format ("sd" or "maestro") SUBSET = string(default="None") # Subset name (if any; subset file must be in the database directory) SUBSET_FILE = string(default=None) # Subset file (if any) MAX_CONFS = int() # Maximum number of conformations to extract for each database molecule. """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.status = "NOT STARTED" # Used by Pipeline to associate -host_phase with this stage: self.setMainProduct('phase') self.addExpectedInput(1, 'phasedb', True) # Input Phase database self.addExpectedOutput(1, "structures", True) # Output structures
[docs] def findManageOutfiles(self, out_basename, outext): output_files = [] for i in range(1, 1000): # should never be more than a thousand output files expected_file = '%s_%i.%s' % (out_basename, i, outext) if os.path.isfile(expected_file): output_files.append(expected_file) else: break return output_files
[docs] def operate(self): """ Perform an operation on the input files. """ # Get the filenames of ligand files specified in input #1: dbpath = self.getInput(1).getPath() expanded_db = os.path.expandvars(dbpath) if dbpath != expanded_db: print('Expanding environment variables in:', dbpath) dbpath = expanded_db print('Input database path:', dbpath) if not dbpath.endswith(".phdb"): msg = "Old database format is not supported" self.exit(msg) outformat = self["OUTFORMAT"] if outformat in ["mae", "mae.gz", "maegz"]: outformat = "maestro" elif outformat in ["sdf", "sdf.gz", "sdfgz"]: outformat = "sd" if outformat == "sd": outext = "sdfgz" elif outformat == "maestro": outext = "maegz" else: msg = "Invalid OUTFORMAT value: %s" % self["OUTFORMAT"] raise RuntimeError(msg) # Must be an absolute path: out_basename = os.path.abspath(self.genOutputFileName(1)) jobname = self.genFileName() + '-export' logfile = jobname + '.log' # Delete any previous output files: output_files = self.findManageOutfiles(out_basename, outext) if output_files: print('Deleting output files from previous run...') for filename in output_files: os.remove(filename) cmd = [PHASE_DATABASE_EXE, dbpath, "export", jobname] if outformat == 'maestro': cmd += ['-omae', out_basename] elif outformat == 'sd': cmd += ['-osd', out_basename] else: raise ValueError("invalid outformat") max_confs = self.get('MAX_CONFS') print('MAX_CONFS:', max_confs) if max_confs is not None: cmd += ['-get', str(max_confs)] if self['SUBSET'] != 'None' or self['SUBSET_FILE']: if self['SUBSET'] != 'None': subsetname = self['SUBSET'] print('SUBSET name:', subsetname) subset_file = check_subset_existence(dbpath, subsetname) cmd += ['-isub', subset_file] elif self['SUBSET_FILE']: subset_file = self['SUBSET_FILE'] print('SUBSET_FILE:', subset_file) if not os.path.isfile(subset_file): msg = "Subset file for does not exist: '%s'" % subset_file raise RuntimeError(msg) if subset_file.endswith("_phase.inp"): subset_file = subset_file[:-10] # Strip "_phase.inp" cmd += ['-isub', subset_file] else: raise ValueError # -limit can be used to specify max-structure per file. # Default is 100,000. print('RUNNING:', subprocess.list2cmdline(cmd)) job = jobcontrol.launch_job(cmd) job.wait() # Process the log file: out_st_count = None if os.path.isfile(logfile): ok = False for line in pipeutils.BackwardsReader(logfile): if 'phase_database successfully completed' in line: print('Job completed successfully. Log file:', logfile) ok = True if "Total number of structures exported =" in line: out_st_count = int(line.split("=")[1].strip()) break if not ok: self.error("ERROR phase_database export job failed to run") print(pipeutils.get_last_20_lines(logfile)) self.exit() if out_st_count is None: self.error( "ERROR Could not determine how many structures phase_database export wrote out" ) print(pipeutils.get_last_20_lines(logfile)) self.exit() else: self.exit( "ERROR phase_database export job failed and log file was not produced." ) # Find output files: # Since Pipeline infrastructure moves files around, output files must # be specified as local path: out_basename = os.path.basename(out_basename) output_files = self.findManageOutfiles(out_basename, outext) if len(output_files) == 0: self.exit("Could not find output files from phase_database export") # FIXME parse the log file to determine how many structures were # exported self.setOutput(1, pipeio.Structures(output_files, out_st_count)) return
[docs]class PhaseShapeStage(stage.Stage): """ Stage for running Phase Shape on the input ligands. This stage is used by Data Fusion workflow (data_fusion_backend.py). Input 1: Shape query structure Input 2: Ligand structures Output 1: Resulting poses """
[docs] def __init__(self, *args, **kwargs): """ Creates the stage instance, and passes the <args> and <kwargs> to the stage.Stage's constructor. """ specs = """ ATOM_TYPES = options("none", "mmod", "element", "qsar", default="pharm") GENERATE_CONFS = boolean(default=True) EXISTING_CONFS = options("keep", "discard", default="discard") MAX_CONFS = integer(default=100) CONFS_PER_BOND = integer(default=10) AMIDE_MODE = options("vary", "orig", "trans") """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) # Used by Pipeline to associate -host_phase with this stage: self.setMainProduct('phase') # Input pin #1 (shape query structure): self.addExpectedInput(1, "structures", True) # Input pin #2 (ligand structures): self.addExpectedInput(2, "structures", True) # Output pin #1 must be of type "structures" and is always produced self.addExpectedOutput(1, "structures", True)
[docs] def operate(self): """ The only overridden & required method in this class. Called by the Pipeline to run this stage's main code. """ # Get the file for the input pin #1: query_files = self.getInput(1).getFiles() if len(query_files) != 1: raise RuntimeError( "This stage requires exactly one query structure file") query_file = query_files[0] if not os.path.isfile(query_file): msg = "Query structure file is missing: " + query_file raise RuntimeError(msg) # Get the file for the input pin #2: screen_files = self.getInput(2).getFiles() if len(screen_files) != 1: raise RuntimeError( "This stage requires exactly one screening structure file") screen_file = screen_files[0] phase_jobname = self.genFileName() cmd = [phase_shape] cmd += ['-shape', query_file] cmd += ['-screen', screen_file] cmd += ['-JOB', phase_jobname] atom_types = self["ATOM_TYPES"] if atom_types != "pharm": if atom_types != "none": cmd += ['-atomTypes', atom_types] # Report similarities with and without atom typing: cmd += ['-dual'] else: cmd.append("-pharm") if self["GENERATE_CONFS"]: # If generating conformers cmd += ['-flex'] cmd += ['-flexMaxConfs', str(self["MAX_CONFS"])] cmd += ['-flexConfsPerBond', str(self["CONFS_PER_BOND"])] if self["EXISTING_CONFS"] == "append": cmd += ['-flexAppend'] amidemode = self["AMIDE_MODE"] cmd += ['-flexAmideOption', amidemode] cmd += ['-NO_CHECKPOINT'] # FIXME to support restarting: # cmd += ['-CHECKPOINT', checkpoint_dir] host_str = self.getHostStr() if not self["GENERATE_CONFS"]: # Work around for PHASE-1150 (See PYAPP-5205): host_str = host_str.split(":")[0] cmd += ['-HOST', host_str] # phase_shape will complain if multiple CPUs are specified without # the -split option. No simple way to pass the NJOBS option to it though. total_cpus = self.getNCpus() if self["GENERATE_CONFS"] and total_cpus > 1: cmd += ['-split'] self.info("Launching Phase Shape job...") self.debug('\nRUNNING: %s\n' % subprocess.list2cmdline(cmd)) job = None try: job = jobcontrol.launch_job(cmd) except Exception as err: msg = 'ERROR: Failed to launch phase_shape job.' msg += '\nError: %s' % str(err) raise RuntimeError(msg) self.info(" JobId: %s" % job.job_id) self.info("Waiting for the job to finish...") job.wait() if not job.succeeded(): raise RuntimeError("Phase Shape job was failed") screen_results_file = phase_jobname + "_align.maegz" if not os.path.isfile(screen_results_file): self.error("Missing output file: %s" % screen_results_file) raise RuntimeError("Phase Shape output file does not exist") self.info("Phase Shape job completed successfully.") num_sts = structure.count_structures(screen_results_file) self.info("Number of output structures: %d" % num_sts) # Set the output #1: self.setOutput(1, pipeio.Structures([screen_results_file], num_sts))