Source code for schrodinger.application.desmond.queue

import os
import time
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple

from schrodinger.job import jobcontrol
from schrodinger.job import queue as jobcontrol_queue

# File names for automatic checkpointing
CHECKPOINT_REQUESTED_FILENAME = 'CHECKPOINT_REQUESTED'
CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME = 'CHECKPOINT_WITH_RESTART_REQUESTED'

Host = Tuple[str, Optional[int]]


[docs]class Queue:
[docs] def __init__(self, hosts: str, max_job: int, max_retries: int, periodic_callback=None): """ :param hosts: string passed to -HOST. :param max_job: Maximum number of jobs to run simultaneously. :param max_retries: Maximum number of times to retry a failed job. :param periodic_callback: Function to call periodically as the jobs run. This can be used to handle the halt message for stopping a running workflow. """ self.hosts = _parse_hosts(hosts, max_job) self.max_retries = max_retries self.jobdj = None self.periodic_callback = periodic_callback self._queued_jobs = [] self._max_stop_time = 3600
[docs] def run(self): """ Run jobs for all multisim stages. Starts a separate JobDJ for each multisim stage.:: queue.push(jobs) queue.run() while jobs: <---------------| jobdj.run() | multisim_jobs.finish() | stage.capture() | next_stage.push() | next_stage.release() | queue.push(next_jobs) -- """ while self._queued_jobs: self._run_stage(self._queued_jobs)
[docs] def stop(self) -> int: """ Attempt to stop the subjobs, but kill them if they do not stop in time. :return: Number of subjobs killed due to a failure to stop. """ stop_time = time.time() num_subjobs_killed = 0 # New jobs may be launched, so loop until # no more jobs are running. stopped_jobs = set() while self.running_jobs: for job in self.running_jobs: jctrl = job.getJob() if time.time() - stop_time > self._max_stop_time: num_subjobs_killed += 1 jctrl.kill() elif job not in stopped_jobs: stopped_jobs.add(job) jctrl.stop() # Process the finished jobs self._finish_stage() return num_subjobs_killed
[docs] def push(self, jobs: List["cmj.Job"]): # noqa: F821 self._queued_jobs.extend( filter(lambda j: j.jlaunch_cmd is not None, jobs))
@property def running_jobs(self) -> List["JobAdapter"]: running_jobs = [] if self.jobdj is None: return [] return [ j for j in self.jobdj.active_jobs if j.getJob() and not j.getJob().isComplete() ] def _run_stage(self, jobs: List["JobAdapter"]): """ Launch JobDJ for a given set of jobs. """ # TODO: max_failures=jobcontrol_queue.NOLIMIT matches the current behavior # but in some cases it would be better to just exit on the first failure. self.jobdj = jobcontrol_queue.JobDJ( hosts=self.hosts, verbosity="normal", job_class=JobAdapter, max_failures=jobcontrol_queue.NOLIMIT, max_retries=self.max_retries) # The host running this often does not have a GPU # so smart distribution should be disabled. self.jobdj.disableSmartDistribution() # Run all jobs for this stage while jobs: # Add the jobs, linking to the multisim jobs for job in jobs: self.jobdj.addJob(job.jlaunch_cmd, multisim_job=job, command_dir=job.dir) # Clear queue, new jobs will be added if the current jobs # are requeued. self._queued_jobs = [] try: self.jobdj.run( status_change_callback=self._status_change_callback, periodic_callback=self.periodic_callback, callback_interval=60) except RuntimeError: # Use multisim error handling for failed jobs pass # Run any requeued jobs jobs = self._queued_jobs # Finish the stage and add new jobs (if any) to self._queued_jobs self._finish_stage() def _finish_stage(self): from schrodinger.application.desmond.cmj import JobStatus for job in self.jobdj.all_jobs: jctrl = job.getJob() if jctrl is None: # Check for jobs that never ran job.multisim_job.status.set(JobStatus.LAUNCH_FAILURE) continue # Skip intermediate jobs that were checkpointed if job.is_checkpointed: continue job.multisim_job.process_completed_job(jctrl) job.multisim_job.finish() def _status_change_callback(self, job: "JobAdapter"): """ Process the job on a status change. """ from schrodinger.application.desmond.cmj import JobStatus if job.state == jobcontrol_queue.JobState.WAITING: job.multisim_job.status.set(JobStatus.WAITING) elif job.state == jobcontrol_queue.JobState.ACTIVE: job.multisim_job.status.set(JobStatus.RUNNING) elif job.state == jobcontrol_queue.JobState.FAILED_RETRYABLE: job.multisim_job.status.set(JobStatus.RETRIABLE_FAILURE) elif job.state in (jobcontrol_queue.JobState.DONE, jobcontrol_queue.JobState.FAILED): if job.state == jobcontrol_queue.JobState.DONE: jctrl = job.getJob() for out_fname in jctrl.getOutputFiles(): if Path(out_fname ).name == CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME: job.multisim_job.process_completed_job( jctrl, restart_requested=True) job.multisim_job.requeue(jctrl) job.is_checkpointed = True self.push([job.multisim_job]) return elif Path(out_fname).name == CHECKPOINT_REQUESTED_FILENAME: job.multisim_job.process_completed_job( jctrl, checkpoint_requested=True) job.multisim_job.finish() job.is_checkpointed = True return
[docs]class JobAdapter(jobcontrol_queue.JobControlJob):
[docs] def __init__(self, *args, multisim_job=None, **kwargs): self.multisim_job = multisim_job # Set to True if this job is checkpointed by the auto restart mechanism self.is_checkpointed = False if launch_timeout := os.getenv('SCHRODINGER_MULTISIM_LAUNCH_TIMEOUT'): kwargs['launch_timeout'] = int(launch_timeout) else: kwargs['launch_timeout'] = 1800 super().__init__(*args, **kwargs)
[docs] def getCommand(self) -> List[str]: # Restart command has priority over the original command return self.multisim_job.jlaunch_cmd
[docs] def maxFailuresReached(*args, **kwargs): # Use multisim failure reporting for now pass
def _parse_hosts(hosts: str, max_job: int) -> List[Host]: """ Parse the hosts while also respecting the given max_job. See `Queue` for the meaning of the arguments. :return: List of Host tuples. """ # Handle multiple hosts "localhost localhost" split_hosts = hosts.strip().split() if len(split_hosts) > 1: # Only running on the same host is supported if len(set(split_hosts)) == 1: hosts = f'{split_hosts[0]}:{len(split_hosts)}' else: raise ValueError("Different hosts are no longer supported. " "All jobs must be run on the same host.") split_host = hosts.split(':') host = split_host[0] if max_job: # Max job takes priority if specified hosts = f'{host}:{max_job}' else: # For non-queue hosts, if max_job and # the number of cpus are not specified, # set to the number of processors. # This is different from JobDJ, which # set it to 1. host_entry = jobcontrol.get_host(host) if len(split_host) == 1 and not host_entry.isQueue(): hosts = f'{host}:{host_entry.processors}' return jobcontrol.host_str_to_list(hosts)