Source code for hedwig.commands

import json
import logging

import funcy

from hedwig.consumer import get_default_queue_name, get_queue, get_queue_messages

class PartialFailure(Exception):
    Error indicating either send_messages or delete_messages API call failed partially

    def __init__(self, result):
        self.success_count = len(result['Successful'])
        self.failure_count = len(result['Failed'])
        self.result = result

def _enqueue_messages(queue, queue_messages) -> None:
    params: dict = {}

    result = queue.send_messages(
                {'Id': queue_message.message_id, 'MessageBody': queue_message.body},
                {'MessageAttributes': queue_message.message_attributes} if queue_message.message_attributes else {},
            for queue_message in queue_messages
    if result.get('Failed'):
        raise PartialFailure(result)

def get_dead_letter_queue(queue):
    queue_name = json.loads(queue.attributes['RedrivePolicy'])['deadLetterTargetArn'].split(':')[-1]
    return get_queue(queue_name)

[docs]def requeue_dead_letter(num_messages: int = 10, visibility_timeout: int = None) -> None: """ Re-queues everything in the Hedwig DLQ back into the Hedwig queue. :param num_messages: Maximum number of messages to fetch in one SQS call. Defaults to 10. :param visibility_timeout: The number of seconds the message should remain invisible to other queue readers. Defaults to None, which is queue default """ queue = get_queue(get_default_queue_name()) dead_letter_queue = get_dead_letter_queue(queue)"Re-queueing messages from {} to {}".format(dead_letter_queue.url, queue.url)) while True: queue_messages = get_queue_messages( dead_letter_queue, num_messages=num_messages, visibility_timeout=visibility_timeout, wait_timeout_s=1 ) if not queue_messages: break"got {} messages from dlq".format(len(queue_messages))) _enqueue_messages(queue, queue_messages) dead_letter_queue.delete_messages( Entries=[{'Id': message.message_id, 'ReceiptHandle': message.receipt_handle} for message in queue_messages] )"Re-queued {} messages".format(len(queue_messages)))