Source code for hedwig.commands

import json
import logging

import funcy

from hedwig.consumer import get_default_queue_name, get_queue, get_queue_messages


class PartialFailure(Exception):
    """
    Error indicating either send_messages or delete_messages API call failed partially
    """

    def __init__(self, result):
        self.success_count = len(result['Successful'])
        self.failure_count = len(result['Failed'])
        self.result = result


def _enqueue_messages(queue, queue_messages) -> None:
    params: dict = {}

    result = queue.send_messages(
        Entries=[
            funcy.merge(
                {'Id': queue_message.message_id, 'MessageBody': queue_message.body},
                {'MessageAttributes': queue_message.message_attributes} if queue_message.message_attributes else {},
                params,
            )
            for queue_message in queue_messages
        ]
    )
    if result.get('Failed'):
        raise PartialFailure(result)


def get_dead_letter_queue(queue):
    queue_name = json.loads(queue.attributes['RedrivePolicy'])['deadLetterTargetArn'].split(':')[-1]
    return get_queue(queue_name)


[docs]def requeue_dead_letter(num_messages: int = 10, visibility_timeout: int = None) -> None: """ Re-queues everything in the Hedwig DLQ back into the Hedwig queue. :param num_messages: Maximum number of messages to fetch in one SQS call. Defaults to 10. :param visibility_timeout: The number of seconds the message should remain invisible to other queue readers. Defaults to None, which is queue default """ queue = get_queue(get_default_queue_name()) dead_letter_queue = get_dead_letter_queue(queue) logging.info("Re-queueing messages from {} to {}".format(dead_letter_queue.url, queue.url)) while True: queue_messages = get_queue_messages( dead_letter_queue, num_messages=num_messages, visibility_timeout=visibility_timeout, wait_timeout_s=1 ) if not queue_messages: break logging.info("got {} messages from dlq".format(len(queue_messages))) _enqueue_messages(queue, queue_messages) dead_letter_queue.delete_messages( Entries=[{'Id': message.message_id, 'ReceiptHandle': message.receipt_handle} for message in queue_messages] ) logging.info("Re-queued {} messages".format(len(queue_messages)))