Source code for schrodinger.stepper.cloud.aws_sqs

"""
AWS SQS API library.

Simple Queue Service: works primarily as publisher & subscriber model.

The available methods here are modeled on GCP's PubSub service. The outline and
terminology are similar to `pubsub.py` and the actions available to users match
the invocation from stepper.py.

Terminology:
+-------------------------+--------------------------+
|          PubSub         |           SQS            |
+-------------------------+--------------------------+
| Topic                   | Queue                    |
+-------------------------+--------------------------+
| Subscriber              | N/A (general API call)   |
+-------------------------+--------------------------+
| Push                    | Send Messages            |
+-------------------------+--------------------------+
| Pull                    | Receive Messages         |
+-------------------------+--------------------------+
| Acknowledge             | Delete Message           |
+-------------------------+--------------------------+
| Acknowledgment Deadline | Visibility Timeout       |
+-------------------------+--------------------------+
| Message -> 1 entry      | Message -> 1 parcel =    |
|                         | _MSG_PARCEL_SIZE entries |
+-------------------------+--------------------------+

#===============================================================================
# Actions:
#===============================================================================

The following high-level methods are available:
1. List all topics available in current user's account.
2. Clear all topics in current user's account.
3. Create new topic (also creates a dead-letter topic and deletes both if they
already exist).
4. Upload a file to a topic by distributing contents into messages.
5. Download messages from a topic into a file.
6. Process messages from an input topic by running a step task and uploads
the result into an output topic.

#===============================================================================
# AWS Services
#===============================================================================

For AWS credential configuration see
`schrodinger.stepper.cloud.aws_client.get_client()` docstring.

#===============================================================================
# Message Parcelling
#===============================================================================

Multiple entries / inputs or parcelled into a single message in an SQS topic /
queue. The number of entries to parcel is determined by the `_MSG_PARCEL_SIZE`
constant. Message parcelling is performed to avoid hitting SQS' 120k in-flight
messages limit. A message is considered in-flight when an SQS queue has sent a
message but the consumer or subscriber has yet to acknowledge the message, and
the message's acknowledgement deadline (visibility timeout) has yet to expire.

There are two workflow patterns when processing these messages inside
`aws_sqs.pull_and_process`:

1. `batch_size` < `_MSG_PARCEL_SIZE`:

When the requested `batch_size` is smaller than the default message parcel
size, a single message's body is broken down into smaller parcels of size
`batch_size` and republished to the input topic. Note that some messages might
contain parcels that are smaller than the requested `batch_size` and thus,
the workers must be designed to handle this case too.

2. `batch_size` >= `_MSG_PARCEL_SIZE`:

When the requested `batch_size` is greater or equal to the default message
parcel size, a single or multiple messages are consumed and processed. Note
that if the requested `batch_size` is not a multiple of the default message
parcel size, the number of entries processed by the worker will be greater than
the `batch_size`. Thus, workers must be designed to handle inputs larger than
the requested `batch_size`.

Outputs:

All step outputs are also parcelled into size `_MSG_PARCEL_SIZE` and uploaded
to the requested output topic. Note that some messages may contain parcels
smaller than `_MSG_PARCEL_SIZE`, and thus, the consumer of the output topic
must be able to handle this discrepancy.
"""
import datetime
import json
import math
import more_itertools
import os
import subprocess
import time

from schrodinger.stepper.cloud import aws_client

builtin_print = print


[docs]def print(*args): """ We override the builtin print in order to add timestamps to debugging statements. """ builtin_print(str(datetime.datetime.now()), *args)
# ============================================================================== # CONSTANTS # ============================================================================== CLIENT = None RESOURCE = None _DEFAULT_MAX_MESSAGES = 10 _AWS_MAX_MESSAGES = 10 _PUBLISH_MAX_MESSAGES = 2 # number of messages to publish per aws request before we hit AWS max size limit of 256kb _PULL_MAX_MESSAGES = 5 # number of messages to pull per aws request; note this is equivalent to 5*_MSG_PARCEL_SIZE entries _MSG_PARCEL_SIZE = 1000 # number of entries per message _MAX_MSG_PARCEL_SIZE_IN_KB = 120.0 # total size of entries per message in memory _RECEIVE_TIMEOUT = 150 # seconds _PUBLISH_FAILURE_PERCENTAGE = 20 # % of msgs to check for failure to upload before raising an error # ============================================================================== # AWS Client # ============================================================================== def _get_client(): global CLIENT if CLIENT is None: CLIENT = aws_client.get_client('sqs') return CLIENT def _get_resource(): global RESOURCE if RESOURCE is None: RESOURCE = aws_client.get_resource('sqs') return RESOURCE # ============================================================================== # AWS API Utility Methods # ==============================================================================
[docs]def create_topic(topic_name): """ Create both a new topic as well as a dead-letter topic to track messages which are received more than twice by subscribers. Note that both topics are deleted and newly generated if they already exist, hence any messages in these two topics will be lost forever. :param topic_name: name used to create new topic and as prefix to dead-letter topic. :type topic_name: str """ # dead-letter queue: dead_letter_topic_name = get_dead_letter_topic_name(topic_name) response = create_queue(dead_letter_topic_name, queue_attributes={}) topic_attributes = get_topic_attributes(dead_letter_topic_name) dead_letter_queue_arn = topic_attributes['QueueArn'] print("Created dead-letter topic at URL: {}".format(response['QueueUrl'])) # new queue: redrive_policy = { 'deadLetterTargetArn': dead_letter_queue_arn, 'maxReceiveCount': '10' } queue_attributes = { 'MessageRetentionPeriod': '604800', # 7 days 'VisibilityTimeout': '300', # seconds 'ReceiveMessageWaitTimeSeconds': '20', # seconds 'RedrivePolicy': json.dumps(redrive_policy), } response = create_queue(topic_name, queue_attributes) print("Created topic at URL: {}".format(response['QueueUrl']))
[docs]def create_queue(queue_name, queue_attributes): """ AWS API request to create new queue with requested queue attributes. :param queue_name: the name of queue to create :type queue_name: str :param queue_attributes: settings for new queue :type queue_attributes: dict :return: aws response :rtype: dict """ client = _get_client() try: response = client.create_queue(QueueName=queue_name, Attributes=queue_attributes) except client.exceptions.QueueNameExists: print("Topic already exists. Overriding...") delete_topic(queue_name) # Note: need to wait 60 seconds before attempting to create queue with # the same name time.sleep(60) response = client.create_queue(QueueName=queue_name, Attributes=queue_attributes) return response
[docs]def get_dead_letter_topic_name(topic_name): return f'{topic_name}_DLQ'
# ============================================================================== # Delete Topics / Queues # ==============================================================================
[docs]def clear_topics(prefix): """ Removes all topics with a name that starts with the given prefix string currently in the aws account. Note that this action is irreversible. :param prefix: topic names that start with prefix to clear. :type prefix: str """ if prefix == '': raise ValueError(f'Topic prefix is not specified: {prefix=}') for url in list_topic_urls(prefix): delete_topic_by_url(url)
[docs]def delete_topic(topic_name): """ Deletes a topic by name in current aws account. Note that this action is irreversible. :param topic_name: topic to delete :type topic_name: str """ url = get_topic_url(topic_name) delete_topic_by_url(url) print("Deleted topic: {}".format(topic_name))
[docs]def delete_topic_by_url(topic_url): """ Deletes a topic by topic url in current aws account. Note that this action is irreversible. :param topic_url: URL of topic to delete :type topic_url: str """ client = _get_client() client.delete_queue(QueueUrl=topic_url) print("Deleted topic at URL: {}".format(topic_url))
# ============================================================================== # Topic / Queue Helper Methods # ==============================================================================
[docs]def list_topic_urls(prefix=''): """ :param prefix: topic prefix to filter results by. :type prefix: str :return: list all available topics by URL. :rtype: list[str] """ resource = _get_resource() for queue in resource.queues.filter(QueueNamePrefix=prefix): yield queue.url
[docs]def get_num_available_messages(topic_name): """ Note that this method requires at least 60 secs to properly account for available messages from the last message sent, even then it is not an exact count. :param topic_name: topic to retrieve messages for :type topic_name: str :return: the number of approximate available messages :rtype: int """ client = _get_client() url = get_topic_url(topic_name) response = client.get_queue_attributes( QueueUrl=url, AttributeNames=['ApproximateNumberOfMessages']) return int(response['Attributes']['ApproximateNumberOfMessages'])
[docs]def get_topic_url(topic_name): """ :param topic_name: URL for topic with this name :type topic_name: str :return: URL to topic :rtype: str """ client = _get_client() response = client.get_queue_url(QueueName=topic_name) return response['QueueUrl']
[docs]def get_topic_attributes(topic_name): """ :param topic_name: name of topic to retrieve attributes of :type topic_name: str :return: a dictionary of all attributes / settings for this topic :rtype: dict """ client = _get_client() url = get_topic_url(topic_name) response = client.get_queue_attributes(QueueUrl=url, AttributeNames=['All']) return response['Attributes']
# ============================================================================== # Send Messages # ==============================================================================
[docs]def publish(topic_name, msg): """ :param topic_name: name of topic to send messages to :type topic_name: str :param msg: message to publish :type msg: str :return: if publish call to aws was successfully executed :rtype: bool """ client = _get_client() url = get_topic_url(topic_name) response = client.send_message(QueueUrl=url, MessageBody=msg) return 'MD5OfMessageBody' in response
[docs]def publish_in_batches(topic_name, msgs): """ Publish to aws in batches of _AWS_MAX_MESSAGES or _PUBLISH_MAX_MESSAGES, whichever holds a lower value. :param topic_name: name of topic to send messages to :type topic_name: str :param msgs: iterator of messages to publish :type msgs: iterable[str] :return: (number of messages published, list of messages failed to publish) :rtype: (int, list[str]) """ client = _get_client() url = get_topic_url(topic_name) failed_msgs = [] num_msgs = 0 for batch in more_itertools.chunked( msgs, min(_PUBLISH_MAX_MESSAGES, _AWS_MAX_MESSAGES)): entries = [{ 'Id': str(idx), 'MessageBody': str(msg) } for idx, msg in enumerate(batch)] response = client.send_message_batch(QueueUrl=url, Entries=entries) failed_entries = response.get('Failed', []) failed_msgs.extend(batch[int(msg['Id'])] for msg in failed_entries) num_msgs += len(entries) - len(failed_entries) print(f'Published {num_msgs} msgs to topic: {topic_name}') if failed_msgs: print(f'Failed to publish {len(failed_msgs)} to topic: {topic_name}') return num_msgs, failed_msgs
# ============================================================================== # Receive Messages # ==============================================================================
[docs]def pull(topic_name, min_messages=1, max_messages=_DEFAULT_MAX_MESSAGES, receive_message_timeout=300): """ Pull messages from requested topic. Guaranteed to return at least `min_messages` unless `receive_message_timeout` expires. :param topic_name: name of topic to pull messages from :type topic_name: str :param min_messages: minimum number of messages to retrieve :type min_messages: int :param max_messages: maximum number of messages to retrieve :type max_messages: int :param receive_message_timeout: expected time to wait to receive a message before returning in seconds. :type receive_message_timeout: int :return: list of aws message dicts :rtype: list[dict] """ if min_messages < 0: raise ValueError(f'Invalid min_messages value of {min_messages}.') if max_messages < min_messages: raise ValueError(f'max_messages ({max_messages}) less than ' f'min_messages ({min_messages}).') start_time = time.time() timeout_time = start_time + receive_message_timeout url = get_topic_url(topic_name) msgs = {} while (len(msgs) < min_messages) and (time.time() < timeout_time): max_msgs = max_messages - len(msgs) received_msgs = _receive_messages(url, min(max_msgs, _AWS_MAX_MESSAGES)) for msg in received_msgs: msgs[msg['MessageId']] = msg print(f'Pulled {len(msgs.values())} msgs from topic: {topic_name}') return list(msgs.values())
def _receive_messages(topic_url, max_messages=_DEFAULT_MAX_MESSAGES): """ AWS API request to receive messages, up to _AWS_MAX_MESSAGES at a time. :param topic_url: topic URL :type topic_url: str :param max_messages: maximum number of messages to request, must be <= _AWS_MAX_MESSAGES. :type max_messages: int :return: list of aws message dicts :rtype: list[dict] """ if max_messages > _AWS_MAX_MESSAGES: raise ValueError(f'AWS SQS cannot receive more than {_AWS_MAX_MESSAGES}' ' msgs per call!') client = _get_client() response = client.receive_message( QueueUrl=topic_url, MaxNumberOfMessages=max_messages, ) return response.get('Messages', []) # ============================================================================== # Message Deletion / Acknowledgement # ==============================================================================
[docs]def acknowledge(messages, topic_name): """ Acknowledge messages in requested topic. Note that messages are batched by _AWS_MAX_MESSAGES per aws request. :param messages: list of aws message dict :type messages: iterable[dict] :param topic_name: name of topic to acknowledge messages from :type topic_name: str :return: list of message dicts that were failed to be acknowledged by aws :rtype: list[dict] """ url = get_topic_url(topic_name) num_msgs = 0 responses = [] for batch in more_itertools.chunked(messages, _AWS_MAX_MESSAGES): num_msgs += len(batch) response = _delete_messages(batch, url) responses.extend(response) print(f'Acknowledged {num_msgs} msgs from topic: {topic_name}') if responses: print(f'Failed to acknowledge {len(responses)} msgs from topic: ' f'{topic_name}') return responses
def _delete_messages(messages, topic_url): """ Delete messages from topic. Note that number of messages must be <= _AWS_MAX_MESSAGES. :param messages: list of aws message dict :type messages: iterable[dict] :param topic_url: topic URL :type topic_url: str :return: list of message dicts that were failed to be acknowledged by aws :rtype: list[dict] """ if len(messages) > _AWS_MAX_MESSAGES: raise ValueError(f'AWS SQS cannot delete more than {_AWS_MAX_MESSAGES}' ' msgs per call!') client = _get_client() entries = [{ 'Id': str(i), 'ReceiptHandle': msg['ReceiptHandle'] } for i, msg in enumerate(messages)] response = client.delete_message_batch(QueueUrl=topic_url, Entries=entries) failed_msgs = [ messages[int(msg['Id'])] for msg in response.get('Failed', []) ] return failed_msgs # ============================================================================== # Message Visibility Timeout # ==============================================================================
[docs]def refresh_ack_timeout(messages, topic_name, timeout=300): """ Refresh acknowledgement timeout (visibility timeout). This extends the timeout of the locked message in the topic so no other subscribers receive this message until the timeout expires. Note that this method resets the timeout rather than extending the existing timeout of the message. :param messages: list of aws message dict :type messages: iterable[dict] :param topic_name: name of topic to refresh timeouts :type topic_name: str :param timeout: timeout in seconds :type timeout: int :return: list of message dicts that were failed to be acknowledged by aws :rtype: list[dict] """ url = get_topic_url(topic_name) num_msgs = 0 responses = [] for batch in more_itertools.chunked(messages, _AWS_MAX_MESSAGES): num_msgs += len(batch) response = _change_message_visibility(batch, url, timeout) responses.extend(response) if responses: print(f'Failed to refresh acknowledgement deadline for ' f'{len(responses)} msgs from topic: {topic_name}') return responses
def _change_message_visibility(messages, topic_url, timeout): """ Change the message visibility timeout. Note that number of messages must be <= _AWS_MAX_MESSAGES. :param messages: list of aws message dict :type messages: iterable[dict] :param topic_name: name of topic to modify visibility :type topic_name: str :param timeout: timeout in seconds :type timeout: int :return: list of message dicts that were failed to be acknowledged by aws :rtype: list[dict] """ if len(messages) > _AWS_MAX_MESSAGES: raise ValueError(f'AWS SQS cannot receive more than {_AWS_MAX_MESSAGES}' ' msgs per call!') client = _get_client() entries = [{ 'Id': str(i), 'ReceiptHandle': msg['ReceiptHandle'], 'VisibilityTimeout': timeout } for i, msg in enumerate(messages)] response = client.change_message_visibility_batch(QueueUrl=topic_url, Entries=entries) failed_msgs = [ messages[int(msg['Id'])] for msg in response.get('Failed', []) ] return failed_msgs # ============================================================================== # Core Methods # ==============================================================================
[docs]def upload(fname, topic_name): """ Upload file encoded into messages, where each message contains _MSG_PARCEL_SIZE number of line(s), to requested topic. The messages are published in batches of _AWS_MAX_MESSAGES or _PUBLISH_MAX_MESSAGES, whichever holds a lower value. :param fname: path to file to upload :type fname: str :param topic_name: name of topic to upload to :type topic_name: str :return: number of messages published :rtype: int """ with open(fname, 'r') as f: lines = (line.strip() for line in f) return _upload(lines, topic_name)
[docs]def multi_file_upload(files, topic_name): """ Upload multiple files as messages to `topic_name`. :param files: List of file paths to upload :type files: List[str] :param topic_name: name of topic to upload to :type topic_name: str :return: number of messages published :rtype: int """ def multi_file_iterator(fnames): for file in fnames: with open(file, 'r') as f: for line in f: yield line.strip() return _upload(multi_file_iterator(files), topic_name)
def _upload(lines, topic_name): """ Upload given lines encoded into messages, where each message contains at most _MSG_PARCEL_SIZE number of line(s), to requested topic. Each message will only contain a maximum of _MAX_MSG_PARCEL_SIZE_IN_KB of data to avoid hitting AWS message body limits. :param lines: Iterator of lines / entries to upload :type lines: Iterator[str] :param topic_name: name of topic to upload to :type topic_name: str :return: number of messages published :rtype: int """ print("uploading...") msgs = ('\n'.join(group) for group in _get_msg_parcels(lines)) num_msgs, failed_msgs = publish_in_batches(topic_name, msgs) if failed_msgs: print(f'warning: the following lines failed to publish: {failed_msgs} ' f'to {topic_name=}') if len(failed_msgs) > ((_PUBLISH_FAILURE_PERCENTAGE / 100) * (num_msgs + len(failed_msgs))): raise RuntimeError('Too many failed messages during upload!') print("uploading done!") return num_msgs def _get_msg_parcels(lines, parcel_size=_MSG_PARCEL_SIZE, max_parcel_size_in_kb=_MAX_MSG_PARCEL_SIZE_IN_KB): """ Returns an iterator of message parcels, where each parcel either contains `parcel_size` number of lines or fewer depending on the max parcel size requested and how large each line is. :param lines: Iterator of lines / entries to upload :type lines: Iterator[str] :param parcel_size: number of lines / items per parcel; superseded by `max_parcel_size_in_kb` parameter. :type parcel_size: int :param max_parcel_size_in_kb: maximum size of each individual parcel :type max_parcel_size_in_kb: float :return: an iterator of parcels. :rtype: Iterator[List] """ parcel = [] current_parcel_size = 0 lines_peekable = more_itertools.peekable(lines) while lines_peekable.peek(False): line_size = len(lines_peekable.peek().encode('utf-8')) / 1000 if current_parcel_size + line_size > max_parcel_size_in_kb: yield parcel parcel = [next(lines_peekable)] current_parcel_size = line_size continue parcel.append(next(lines_peekable)) current_parcel_size += line_size if len(parcel) == parcel_size: yield parcel parcel = [] current_parcel_size = 0 if parcel: yield parcel
[docs]def download(topic_name, fname): """ Download all messages in given topic into a file. The messages are pulled in batches of `_PULL_MAX_MESSAGES` and `_RECEIVE_TIMEOUT` is used to determine how long to wait before assuming the topic is exhausted. Each message is written out as a new line. :param topic_name: name of topic to pull messages from :type topic_name: str :param fname: path to file to write messages out to :type fname: str """ print('downloading...') with open(fname, 'w') as outfile: while msgs := pull(topic_name, min_messages=_PULL_MAX_MESSAGES, max_messages=_PULL_MAX_MESSAGES, receive_message_timeout=_RECEIVE_TIMEOUT): outfile.write('\n'.join(msg['Body'] for msg in msgs) + '\n') acknowledge(msgs, topic_name) print('download complete!')
def _write_input_file_from_msgs(msgs, fname): """ :param msgs: list of aws message dicts :type msgs: iterable[dict] :param fname: filename :type fname: str """ _write_input_file((msg['Body'] for msg in msgs), fname) def _write_input_file(lines, fname): """ :param msgs: list of lines to write :type msgs: iterable[str] :param fname: filename :type fname: str """ with open(fname, 'w') as f: f.write('\n'.join(lines))
[docs]def pull_and_process(input_topic, output_topic, settings_file, step_path, workflow_id, batch_size): """ Pull `batch_size` number of messages from `input_topic`, run step task as a subprocess task, and upload results to `output_topic`. :param input_topic: input topic name :type input_topic: str :param output_topic: output topic name :type output_topic: str :param settings_file: file path to settings yaml file for stepper :type settings_file: str :param step_path: complete path to step to execute in a subprocess call :type step_path: str :param workflow_id: step workflow ID :type workflow_id: str :param batch_size: number of data values / entries to process in a batch :type batch_size: int :return: (messages pulled / processed, messages uploaded) :rtype: (int, int) """ start_time = time.time() timeout_time = start_time + 60 * float( os.environ.get('SCHRODINGER_CLOUD_WORKER_TIMEOUT', float('inf'))) total_msgs_processed = total_msgs_uploaded = 0 total_input_count = total_output_count = 0 # keeps track of output files : output count till parcel threshold is met # to upload: output_files = {} # the number of msgs to pull to meet the requested batch_size: msg_batch_size = math.ceil(batch_size / _MSG_PARCEL_SIZE) while msgs := pull(input_topic, msg_batch_size, msg_batch_size, _RECEIVE_TIMEOUT): timed_out = time.time() > timeout_time if timed_out: print("WARNING: Timed out") break if batch_size < _MSG_PARCEL_SIZE: assert len(msgs) == 1, (f'only expected a single msg but ' f'received {len(msgs)}') if len(msgs[0]['Body'].split('\n')) > batch_size: # need to break apart msg body into chunks of batch_size and # republish as new msgs to input topic batch_and_republish(msgs[0], input_topic, batch_size) continue output_fname, input_count, output_count = process_msgs( msgs, input_topic, output_topic, settings_file, step_path, workflow_id) output_files[output_fname] = output_count total_input_count += input_count total_output_count += output_count msgs_uploaded = 0 if sum(output_files.values()) >= _MSG_PARCEL_SIZE: msgs_uploaded = multi_file_upload(output_files, output_topic) output_files = {} print("acknowledging messages...") acknowledge(msgs, input_topic) total_msgs_uploaded += msgs_uploaded total_msgs_processed += len(msgs) print_statistics(len(msgs), msgs_uploaded, total_msgs_processed, total_msgs_uploaded, input_count, output_count, total_input_count, total_output_count) if len(output_files): msgs_uploaded = multi_file_upload(output_files, output_topic) total_msgs_uploaded += msgs_uploaded print_total_statistics(total_msgs_processed, total_msgs_uploaded, total_input_count, total_output_count) print(f"took {pretty_time_delta(time.time()-start_time)} to pull " f"{total_msgs_processed} msgs.") return total_input_count, total_output_count
[docs]def batch_and_republish(msg, input_topic, batch_size): """ Break a single message body into batch_size parcels and republish as new messages to input_topic. :param msg: single aws message dict to republish in smaller chunks. :type msgs: dict :param input_topic: input topic name :type input_topic: str :param batch_size: number of entries to repackage in a single message. :type batch_size: int """ msg_body = msg['Body'].split('\n') new_msgs = [ '\n'.join(chunk) for chunk in more_itertools.chunked(msg_body, batch_size) ] num_msgs, failed_msgs = publish_in_batches(input_topic, new_msgs) if failed_msgs: print(f'Warning: batched and republished {num_msgs} msgs while ' f'{len(failed_msgs)} failed to republish') print("acknowledging message...") acknowledge([msg], input_topic)
[docs]def process_msgs(msgs, input_topic, output_topic, settings_file, step_path, workflow_id): """ Process the requested msgs by writing out to disk to use as inputs for step_path. Only the acknowledgement deadline is refreshed, while the msgs themselves will remain unacknowledged. The output file generated by the step is returned. :param input_topic: input topic name :type input_topic: str :param output_topic: output topic name :type output_topic: str :param settings_file: file path to settings yaml file for stepper :type settings_file: str :param step_path: complete path to step to execute in a subprocess call :type step_path: str :param workflow_id: step workflow ID :type workflow_id: str :return: the output file name from the step workflow. :rtype: str """ input_fname = 'inputs.in' output_fname = f"outputs-{msgs[0]['MessageId']}.out" _write_input_file_from_msgs(msgs, input_fname) cmdlist = _generate_cmdlist(step_path, settings_file, workflow_id, input_fname, output_fname) print("Running cmd...:", ' '.join(cmdlist)) with subprocess.Popen(cmdlist, stdin=subprocess.PIPE) as child: print("started child process...") while child.poll() is None: print("refreshing ack...") refresh_ack_timeout(msgs, input_topic) time.sleep(10) if child.returncode != 0: print(child.stdout) print(child.stderr) print('publishing msgs to dead-letter queue...') publish_in_batches(get_dead_letter_topic_name(output_topic), (msg['Body'] for msg in msgs)) raise RuntimeError("Child process failed.") return output_fname, get_count(input_fname), get_count(output_fname)
[docs]def get_count(filepath): """ :param filepath: path to file to get number of line. :type filepath: str :return: the number of lines in a file. :rtype: int """ count = 0 with open(filepath, 'r') as f: for _ in f: count += 1 return count
def _generate_cmdlist(step_path, settings_file, workflow_id, input_fname, output_fname='outputs.out'): """ :return: generated cmdlist for stepper execution. :rtype: dict """ return [ os.path.join(os.environ['SCHRODINGER'], 'run'), "stepper.py", step_path, input_fname, output_fname, '-config', settings_file, '-workflow-id', workflow_id, '-wait', '-no-job' ]
[docs]def pretty_time_delta(seconds): sign_string = '-' if seconds < 0 else '' seconds = abs(int(seconds)) days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) if days > 0: return '%s%dd%dh%dm%ds' % (sign_string, days, hours, minutes, seconds) elif hours > 0: return '%s%dh%dm%ds' % (sign_string, hours, minutes, seconds) elif minutes > 0: return '%s%dm%ds' % (sign_string, minutes, seconds) else: return '%s%ds' % (sign_string, seconds)