schrodinger.stepper.cloud.aws_sqs module

AWS SQS API library.

Simple Queue Service: works primarily as publisher & subscriber model.

The available methods here are modeled on GCP’s PubSub service. The outline and terminology are similar to pubsub.py and the actions available to users match the invocation from stepper.py.

Terminology: +————————-+————————–+ | PubSub | SQS | +————————-+————————–+ | Topic | Queue | +————————-+————————–+ | Subscriber | N/A (general API call) | +————————-+————————–+ | Push | Send Messages | +————————-+————————–+ | Pull | Receive Messages | +————————-+————————–+ | Acknowledge | Delete Message | +————————-+————————–+ | Acknowledgment Deadline | Visibility Timeout | +————————-+————————–+ | Message -> 1 entry | Message -> 1 parcel = | | | _MSG_PARCEL_SIZE entries | +————————-+————————–+

#=============================================================================== # Actions: #===============================================================================

The following high-level methods are available: 1. List all topics available in current user’s account. 2. Clear all topics in current user’s account. 3. Create new topic (also creates a dead-letter topic and deletes both if they already exist). 4. Upload a file to a topic by distributing contents into messages. 5. Download messages from a topic into a file. 6. Process messages from an input topic by running a step task and uploads the result into an output topic.

#=============================================================================== # AWS Services #===============================================================================

For AWS credential configuration see schrodinger.stepper.cloud.aws_client.get_client() docstring.

#=============================================================================== # Message Parcelling #===============================================================================

Multiple entries / inputs or parcelled into a single message in an SQS topic / queue. The number of entries to parcel is determined by the _MSG_PARCEL_SIZE constant. Message parcelling is performed to avoid hitting SQS’ 120k in-flight messages limit. A message is considered in-flight when an SQS queue has sent a message but the consumer or subscriber has yet to acknowledge the message, and the message’s acknowledgement deadline (visibility timeout) has yet to expire.

There are two workflow patterns when processing these messages inside aws_sqs.pull_and_process:

  1. batch_size < _MSG_PARCEL_SIZE:

When the requested batch_size is smaller than the default message parcel size, a single message’s body is broken down into smaller parcels of size batch_size and republished to the input topic. Note that some messages might contain parcels that are smaller than the requested batch_size and thus, the workers must be designed to handle this case too.

  1. batch_size >= _MSG_PARCEL_SIZE:

When the requested batch_size is greater or equal to the default message parcel size, a single or multiple messages are consumed and processed. Note that if the requested batch_size is not a multiple of the default message parcel size, the number of entries processed by the worker will be greater than the batch_size. Thus, workers must be designed to handle inputs larger than the requested batch_size.

Outputs:

All step outputs are also parcelled into size _MSG_PARCEL_SIZE and uploaded to the requested output topic. Note that some messages may contain parcels smaller than _MSG_PARCEL_SIZE, and thus, the consumer of the output topic must be able to handle this discrepancy.

schrodinger.stepper.cloud.aws_sqs.print(*args)

We override the builtin print in order to add timestamps to debugging statements.

schrodinger.stepper.cloud.aws_sqs.create_topic(topic_name)

Create both a new topic as well as a dead-letter topic to track messages which are received more than twice by subscribers. Note that both topics are deleted and newly generated if they already exist, hence any messages in these two topics will be lost forever.

Parameters

topic_name (str) – name used to create new topic and as prefix to dead-letter topic.

schrodinger.stepper.cloud.aws_sqs.create_queue(queue_name, queue_attributes)

AWS API request to create new queue with requested queue attributes.

Parameters
  • queue_name (str) – the name of queue to create

  • queue_attributes (dict) – settings for new queue

Returns

aws response

Return type

dict

schrodinger.stepper.cloud.aws_sqs.get_dead_letter_topic_name(topic_name)
schrodinger.stepper.cloud.aws_sqs.clear_topics(prefix)

Removes all topics with a name that starts with the given prefix string currently in the aws account. Note that this action is irreversible.

Parameters

prefix (str) – topic names that start with prefix to clear.

schrodinger.stepper.cloud.aws_sqs.delete_topic(topic_name)

Deletes a topic by name in current aws account. Note that this action is irreversible.

Parameters

topic_name (str) – topic to delete

schrodinger.stepper.cloud.aws_sqs.delete_topic_by_url(topic_url)

Deletes a topic by topic url in current aws account. Note that this action is irreversible.

Parameters

topic_url (str) – URL of topic to delete

schrodinger.stepper.cloud.aws_sqs.list_topic_urls(prefix='')
Parameters

prefix (str) – topic prefix to filter results by.

Returns

list all available topics by URL.

Return type

list[str]

schrodinger.stepper.cloud.aws_sqs.get_num_available_messages(topic_name)

Note that this method requires at least 60 secs to properly account for available messages from the last message sent, even then it is not an exact count.

Parameters

topic_name (str) – topic to retrieve messages for

Returns

the number of approximate available messages

Return type

int

schrodinger.stepper.cloud.aws_sqs.get_topic_url(topic_name)
Parameters

topic_name (str) – URL for topic with this name

Returns

URL to topic

Return type

str

schrodinger.stepper.cloud.aws_sqs.get_topic_attributes(topic_name)
Parameters

topic_name (str) – name of topic to retrieve attributes of

Returns

a dictionary of all attributes / settings for this topic

Return type

dict

schrodinger.stepper.cloud.aws_sqs.publish(topic_name, msg)
Parameters
  • topic_name (str) – name of topic to send messages to

  • msg (str) – message to publish

Returns

if publish call to aws was successfully executed

Return type

bool

schrodinger.stepper.cloud.aws_sqs.publish_in_batches(topic_name, msgs)

Publish to aws in batches of _AWS_MAX_MESSAGES or _PUBLISH_MAX_MESSAGES, whichever holds a lower value.

Parameters
  • topic_name (str) – name of topic to send messages to

  • msgs (iterable[str]) – iterator of messages to publish

Returns

(number of messages published, list of messages failed to publish)

Return type

(int, list[str])

schrodinger.stepper.cloud.aws_sqs.pull(topic_name, min_messages=1, max_messages=10, receive_message_timeout=300)

Pull messages from requested topic. Guaranteed to return at least min_messages unless receive_message_timeout expires.

Parameters
  • topic_name (str) – name of topic to pull messages from

  • min_messages (int) – minimum number of messages to retrieve

  • max_messages (int) – maximum number of messages to retrieve

  • receive_message_timeout (int) – expected time to wait to receive a message before returning in seconds.

Returns

list of aws message dicts

Return type

list[dict]

schrodinger.stepper.cloud.aws_sqs.acknowledge(messages, topic_name)

Acknowledge messages in requested topic. Note that messages are batched by _AWS_MAX_MESSAGES per aws request.

Parameters
  • messages (iterable[dict]) – list of aws message dict

  • topic_name (str) – name of topic to acknowledge messages from

Returns

list of message dicts that were failed to be acknowledged by aws

Return type

list[dict]

schrodinger.stepper.cloud.aws_sqs.refresh_ack_timeout(messages, topic_name, timeout=300)

Refresh acknowledgement timeout (visibility timeout). This extends the timeout of the locked message in the topic so no other subscribers receive this message until the timeout expires. Note that this method resets the timeout rather than extending the existing timeout of the message.

Parameters
  • messages (iterable[dict]) – list of aws message dict

  • topic_name (str) – name of topic to refresh timeouts

  • timeout (int) – timeout in seconds

Returns

list of message dicts that were failed to be acknowledged by aws

Return type

list[dict]

schrodinger.stepper.cloud.aws_sqs.upload(fname, topic_name)

Upload file encoded into messages, where each message contains _MSG_PARCEL_SIZE number of line(s), to requested topic. The messages are published in batches of _AWS_MAX_MESSAGES or _PUBLISH_MAX_MESSAGES, whichever holds a lower value.

Parameters
  • fname (str) – path to file to upload

  • topic_name (str) – name of topic to upload to

Returns

number of messages published

Return type

int

schrodinger.stepper.cloud.aws_sqs.multi_file_upload(files, topic_name)

Upload multiple files as messages to topic_name.

Parameters
  • files (List[str]) – List of file paths to upload

  • topic_name (str) – name of topic to upload to

Returns

number of messages published

Return type

int

schrodinger.stepper.cloud.aws_sqs.download(topic_name, fname)

Download all messages in given topic into a file. The messages are pulled in batches of _PULL_MAX_MESSAGES and _RECEIVE_TIMEOUT is used to determine how long to wait before assuming the topic is exhausted. Each message is written out as a new line.

Parameters
  • topic_name (str) – name of topic to pull messages from

  • fname (str) – path to file to write messages out to

schrodinger.stepper.cloud.aws_sqs.pull_and_process(input_topic, output_topic, settings_file, step_path, workflow_id, batch_size)

Pull batch_size number of messages from input_topic, run step task as a subprocess task, and upload results to output_topic.

Parameters
  • input_topic (str) – input topic name

  • output_topic (str) – output topic name

  • settings_file (str) – file path to settings yaml file for stepper

  • step_path (str) – complete path to step to execute in a subprocess call

  • workflow_id (str) – step workflow ID

  • batch_size (int) – number of data values / entries to process in a batch

Returns

(messages pulled / processed, messages uploaded)

Return type

(int, int)

schrodinger.stepper.cloud.aws_sqs.print_statistics(msgs_processed, msgs_uploaded, total_msgs_processed, total_msgs_uploaded, input_count, output_count, total_input_count, total_output_count)
schrodinger.stepper.cloud.aws_sqs.print_total_statistics(total_msgs_processed, total_msgs_uploaded, total_input_count, total_output_count)
schrodinger.stepper.cloud.aws_sqs.batch_and_republish(msg, input_topic, batch_size)

Break a single message body into batch_size parcels and republish as new messages to input_topic.

Parameters
  • msg – single aws message dict to republish in smaller chunks.

  • input_topic (str) – input topic name

  • batch_size (int) – number of entries to repackage in a single message.

schrodinger.stepper.cloud.aws_sqs.process_msgs(msgs, input_topic, output_topic, settings_file, step_path, workflow_id)

Process the requested msgs by writing out to disk to use as inputs for step_path. Only the acknowledgement deadline is refreshed, while the msgs themselves will remain unacknowledged. The output file generated by the step is returned.

Parameters
  • input_topic (str) – input topic name

  • output_topic (str) – output topic name

  • settings_file (str) – file path to settings yaml file for stepper

  • step_path (str) – complete path to step to execute in a subprocess call

  • workflow_id (str) – step workflow ID

Returns

the output file name from the step workflow.

Return type

str

schrodinger.stepper.cloud.aws_sqs.get_count(filepath)
Parameters

filepath (str) – path to file to get number of line.

Returns

the number of lines in a file.

Return type

int

schrodinger.stepper.cloud.aws_sqs.pretty_time_delta(seconds)