Source code for hedwig.consumer
import threading
import typing
from hedwig.backends.utils import get_consumer_backend
[docs]def process_messages_for_lambda_consumer(lambda_event: dict) -> None:
sns_consumer_backend = get_consumer_backend()
sns_consumer_backend.process_messages(lambda_event)
[docs]def listen_for_messages(
num_messages: int = 10, visibility_timeout_s: typing.Optional[int] = None, shutdown_event: threading.Event = None
) -> None:
"""
Starts a Hedwig listener for message types provided and calls the callback handlers like so:
.. code-block:: python
callback_fn(message)
The message is explicitly deleted only if callback function ran successfully. In case of an exception the message is
kept on queue and processed again. If the callback keeps failing, the message is moved to the dead-letter queue.
This function is blocking by default. It may be stopped by passing a shut down event object which can be set to
stop the function.
:param num_messages: Maximum number of messages to fetch in one API call. Defaults to 10
:param visibility_timeout_s: The number of seconds the message should remain invisible to other queue readers.
Defaults to None, which is queue default
:param shutdown_event: An event to signal that the process should shut down. This prevents more messages from
being de-queued and function exits after the current messages have been processed.
"""
if not shutdown_event:
shutdown_event = threading.Event()
consumer_backend = get_consumer_backend()
consumer_backend.fetch_and_process_messages(
num_messages=num_messages, visibility_timeout=visibility_timeout_s, shutdown_event=shutdown_event
)