API reference¶
- hedwig.consumer.listen_for_messages(num_messages=10, visibility_timeout_s=None, shutdown_event=None)[source]¶
Starts a Hedwig listener for message types provided and calls the callback handlers like so:
callback_fn(message)
The message is explicitly deleted only if callback function ran successfully. In case of an exception the message is kept on queue and processed again. If the callback keeps failing, the message is moved to the dead-letter queue.
This function is blocking by default. It may be stopped by passing a shut down event object which can be set to stop the function.
- Parameters
num_messages (
int
) – Maximum number of messages to fetch in one API call. Defaults to 10visibility_timeout_s (
Optional
[int
]) – The number of seconds the message should remain invisible to other queue readers. Defaults to None, which is queue defaultshutdown_event (
Optional
[Event
]) – An event to signal that the process should shut down. This prevents more messages from being de-queued and function exits after the current messages have been processed.
- Return type
None
- hedwig.conf.settings¶
This object allows settings to be accessed as properties. Settings can be configured in one of three ways:
Environment variable named
SETTINGS_MODULE
that points to a python module with settings as module attributesDjango - if Django can be imported, Django settings will be used automatically
Using an object or dict, by calling
hedwig.conf.settings.configure_with_object()
Some setting values need to be string import paths will be automatically resolved and return the class.
Once settings have been configured, they shouldn’t be changed. It is possible to re-configure for testing, but its not guaranteed to work the same way for non-test use cases.
- hedwig.conf.settings.configure_with_object(obj)¶
Set Hedwig config using a dataclass-like object that contains all settings as its attributes, or a dict that contains settings as its keys.
- Return type
None
- hedwig.conf.settings.clear_cache()¶
Clear settings cache - useful for testing only
- Return type
None
- hedwig.conf.settings.configured¶
Have Hedwig settings been configured?
- class hedwig.models.Message(data, type, version, id=<factory>, metadata=<factory>)[source]¶
Model for Hedwig messages. A Message object will always have known message schema and schema version even if the data _may_ not be valid.
- data: Any¶
Message data
- type: str¶
Message type. May be none if message is invalid
- version: distutils.version.StrictVersion¶
StrictVersion object representing data schema version.
- id: str¶
Message identifier
- metadata: hedwig.models.Metadata¶
Message metadata
- static deserialize(payload, attributes, provider_metadata)[source]¶
Deserialize a message from on-the-wire payload :raise:
hedwig.ValidationError
if validation fails.- Return type
- static deserialize_firehose(line)[source]¶
Deserialize a message from a line of firehose file :raise:
hedwig.ValidationError
if validation fails.- Return type
- static deserialize_containerized(payload)[source]¶
Deserialize a message assuming containerized format regardless of configured settings :raise:
hedwig.ValidationError
if validation fails.- Return type
- classmethod new(msg_type, version, data, msg_id=None, headers=None)[source]¶
Creates Message object given type, data schema version and data. This is typically used by the publisher code.
- Parameters
msg_type (
Union
[str
,Enum
]) – message type (could be an enum, it’s value will be used)version (
StrictVersion
) – StrictVersion representing data schemadata (
Any
) – The dict to pass in data field of Message.msg_id (
Optional
[str
]) – Custom message identifier. If not passed, a randomly generated uuid will be used.headers (
Optional
[Dict
[str
,str
]]) – Custom headers (keys must not begin with reserved namespace hedwig_)
- Return type
- publish()[source]¶
Publish this message on Hedwig infra :rtype:
Union
[str
,Future
] :returns: for async publishers, returns a future that represents the publish api call, otherwise, returns the published message id
- extend_visibility_timeout(visibility_timeout_s)[source]¶
Extends visibility timeout of a message for long running tasks.
- Return type
None
- property timestamp: int¶
- Return type
int
- property headers: Dict[str, str]¶
- Return type
Dict
[str
,str
]
- property provider_metadata¶
- property publisher: str¶
- Return type
str
- serialize()[source]¶
Serialize a message for appropriate on-the-wire format :rtype:
Tuple
[Union
[str
,bytes
],dict
] :return: Tuple of message payload and transport attributes
- class hedwig.models.Metadata(timestamp=<factory>, publisher=<factory>, headers=<factory>, provider_metadata=None)[source]¶
- timestamp: int¶
Timestamp of message creation in epoch milliseconds
- publisher: str¶
Publisher of message
- headers: Dict[str, str]¶
Custom headers sent with the message
- provider_metadata: Any = None¶
Provider specific metadata, such as SQS Receipt, or Google ack id. This may be used to extend message visibility if the task is running longer than expected using
Message.extend_visibility_timeout()
- class hedwig.validators.jsonschema.JSONSchemaValidator(schema=None)[source]¶
- checker = <FormatChecker checkers=['date', 'email', 'hostname', 'idn-email', 'idn-hostname', 'ipv4', 'ipv6', 'json-pointer', 'regex', 'relative-json-pointer', 'time']>¶
FormatChecker that checks for format JSON-schema field. This may be customized by an app by overriding setting HEDWIG_DATA_VALIDATOR_CLASS and defining more format checkers.
- deserialize(message_payload, attributes, provider_metadata)¶
Deserialize a message from the on-the-wire format :type message_payload:
Union
[str
,bytes
] :param message_payload: Raw message payload as received from the backend :type provider_metadata:Any
:param provider_metadata: Provider specific metadata :type attributes:dict
:param attributes: Message attributes from the transport backend :rtype:Message
:returns: Message object if valid :raise:hedwig.ValidationError
if validation fails.
- deserialize_containerized(message_payload)¶
Deserialize a message assuming containerized format regardless of configured setting. :type message_payload:
Union
[str
,bytes
] :param message_payload: Raw message payload as received from the backend :rtype:Message
:returns: Message object if valid :raise:hedwig.ValidationError
if validation fails.
- deserialize_firehose(line)¶
Deserialize a message from a line in firehose file. This is slightly different from on-the-wire format:
It always uses container format (i.e. HEDWIG_USE_TRANSPORT_MESSAGE_ATTRIBUTES is ignored)
For binary file formats, its possible data is encoded as binary base64 blob rather than JSON.
Known minor versions isn’t verified - knowing major version schema is good enough to read firehose.
- Parameters
line (
str
) – Raw line read from firehose file- Return type
- Returns
Message object if valid
- Raise
hedwig.ValidationError
if validation fails.
- serialize(message)¶
Serialize a message for appropriate on-the-wire format :rtype:
Tuple
[Union
[str
,bytes
],dict
] :return: Tuple of message payload and transport attributes
- serialize_containerized(message)¶
Serialize a message using containerized format regardless of configured settings. In most cases, you just want to use .serialize. :rtype:
Union
[str
,bytes
] :return: Message payload
- serialize_firehose(message)¶
Serialize a message for appropriate firehose file format. See
hedwig.validators.base.HedwigBaseValidator.deserialize_firehose()
for details. :rtype:str
:return: Tuple of message payload and transport attributes
- class hedwig.validators.protobuf.ProtobufValidator(schema_module=None)[source]¶
A validator that encodes the payload using Protobuf binary format.
- deserialize(message_payload, attributes, provider_metadata)¶
Deserialize a message from the on-the-wire format :type message_payload:
Union
[str
,bytes
] :param message_payload: Raw message payload as received from the backend :type provider_metadata:Any
:param provider_metadata: Provider specific metadata :type attributes:dict
:param attributes: Message attributes from the transport backend :rtype:Message
:returns: Message object if valid :raise:hedwig.ValidationError
if validation fails.
- deserialize_containerized(message_payload)¶
Deserialize a message assuming containerized format regardless of configured setting. :type message_payload:
Union
[str
,bytes
] :param message_payload: Raw message payload as received from the backend :rtype:Message
:returns: Message object if valid :raise:hedwig.ValidationError
if validation fails.
- deserialize_firehose(line)¶
Deserialize a message from a line in firehose file. This is slightly different from on-the-wire format:
It always uses container format (i.e. HEDWIG_USE_TRANSPORT_MESSAGE_ATTRIBUTES is ignored)
For binary file formats, its possible data is encoded as binary base64 blob rather than JSON.
Known minor versions isn’t verified - knowing major version schema is good enough to read firehose.
- Parameters
line (
str
) – Raw line read from firehose file- Return type
- Returns
Message object if valid
- Raise
hedwig.ValidationError
if validation fails.
- serialize(message)¶
Serialize a message for appropriate on-the-wire format :rtype:
Tuple
[Union
[str
,bytes
],dict
] :return: Tuple of message payload and transport attributes
- serialize_containerized(message)¶
Serialize a message using containerized format regardless of configured settings. In most cases, you just want to use .serialize. :rtype:
Union
[str
,bytes
] :return: Message payload
- serialize_firehose(message)¶
Serialize a message for appropriate firehose file format. See
hedwig.validators.base.HedwigBaseValidator.deserialize_firehose()
for details. :rtype:str
:return: Tuple of message payload and transport attributes
- hedwig.commands.requeue_dead_letter(num_messages=10, visibility_timeout=None)[source]¶
Re-queues everything in the Hedwig DLQ back into the Hedwig queue.
- Parameters
num_messages (
int
) – Maximum number of messages to fetch in one call. Defaults to 10.visibility_timeout (
Optional
[int
]) – The number of seconds the message should remain invisible to other queue readers. Defaults to None, which is queue default
- Return type
None
- class hedwig.backends.gcp.GoogleMetadata(ack_id, subscription_path, publish_time, delivery_attempt)[source]¶
Google Pub/Sub specific metadata for a Message
- ack_id: str¶
The ID used to ack the message
- subscription_path: str¶
Path of the Pub/Sub subscription from which this message was pulled
- publish_time: datetime.datetime¶
Time this message was originally published to Pub/Sub
- delivery_attempt: int¶
The delivery attempt counter received from Pub/Sub. The first delivery of a given message will have this value as 1. The value is calculated as best effort and is approximate.
- class hedwig.backends.aws.AWSMetadata(receipt, first_receive_time, sent_time, receive_count)[source]¶
AWS specific metadata for a Message
- receipt: str¶
AWS receipt identifier
Testing¶
- hedwig.testing.pytest_plugin.mock_hedwig_publish()[source]¶
A pytest fixture that mocks Hedwig publisher and lets you verify that your test publishes appropriate messages.
- Return type
Generator
[HedwigPublishMock
,None
,None
]
- class hedwig.testing.pytest_plugin.HedwigPublishMock(*args, **kw)[source]¶
Custom mock class used by
hedwig.testing.pytest_plugin.mock_hedwig_publish()
to mock the publisher.