API reference

hedwig.consumer.listen_for_messages(num_messages=10, visibility_timeout_s=None, shutdown_event=None)[source]

Starts a Hedwig listener for message types provided and calls the callback handlers like so:

callback_fn(message)

The message is explicitly deleted only if callback function ran successfully. In case of an exception the message is kept on queue and processed again. If the callback keeps failing, the message is moved to the dead-letter queue.

This function is blocking by default. It may be stopped by passing a shut down event object which can be set to stop the function.

Parameters
  • num_messages (int) – Maximum number of messages to fetch in one API call. Defaults to 10

  • visibility_timeout_s (Optional[int]) – The number of seconds the message should remain invisible to other queue readers. Defaults to None, which is queue default

  • shutdown_event (Optional[Event]) – An event to signal that the process should shut down. This prevents more messages from being de-queued and function exits after the current messages have been processed.

Return type

None

hedwig.consumer.process_messages_for_lambda_consumer(lambda_event)[source]
Return type

None

hedwig.conf.settings

This object allows settings to be accessed as properties. Settings can be configured in one of three ways:

  1. Environment variable named SETTINGS_MODULE that points to a python module with settings as module attributes

  2. Django - if Django can be imported, Django settings will be used automatically

  3. Using an object or dict, by calling hedwig.conf.settings.configure_with_object()

Some setting values need to be string import paths will be automatically resolved and return the class.

Once settings have been configured, they shouldn’t be changed. It is possible to re-configure for testing, but its not guaranteed to work the same way for non-test use cases.

hedwig.conf.settings.configure_with_object(obj)

Set Hedwig config using a dataclass-like object that contains all settings as its attributes, or a dict that contains settings as its keys.

Return type

None

hedwig.conf.settings.clear_cache()

Clear settings cache - useful for testing only

Return type

None

hedwig.conf.settings.configured

Have Hedwig settings been configured?

class hedwig.models.Message(data, type, version, id=<factory>, metadata=<factory>)[source]

Model for Hedwig messages. A Message object will always have known message schema and schema version even if the data _may_ not be valid.

data: Any

Message data

type: str

Message type. May be none if message is invalid

version: distutils.version.StrictVersion

StrictVersion object representing data schema version.

id: str

Message identifier

metadata: hedwig.models.Metadata

Message metadata

static deserialize(payload, attributes, provider_metadata)[source]

Deserialize a message from on-the-wire payload :raise: hedwig.ValidationError if validation fails.

Return type

Message

static deserialize_firehose(line)[source]

Deserialize a message from a line of firehose file :raise: hedwig.ValidationError if validation fails.

Return type

Message

static deserialize_containerized(payload)[source]

Deserialize a message assuming containerized format regardless of configured settings :raise: hedwig.ValidationError if validation fails.

Return type

Message

classmethod new(msg_type, version, data, msg_id=None, headers=None)[source]

Creates Message object given type, data schema version and data. This is typically used by the publisher code.

Parameters
  • msg_type (Union[str, Enum]) – message type (could be an enum, it’s value will be used)

  • version (StrictVersion) – StrictVersion representing data schema

  • data (Any) – The dict to pass in data field of Message.

  • msg_id (Optional[str]) – Custom message identifier. If not passed, a randomly generated uuid will be used.

  • headers (Optional[Dict[str, str]]) – Custom headers (keys must not begin with reserved namespace hedwig_)

Return type

Message

publish()[source]

Publish this message on Hedwig infra :rtype: Union[str, Future] :returns: for async publishers, returns a future that represents the publish api call, otherwise, returns the published message id

extend_visibility_timeout(visibility_timeout_s)[source]

Extends visibility timeout of a message for long running tasks.

Return type

None

property timestamp: int
Return type

int

property headers: Dict[str, str]
Return type

Dict[str, str]

property provider_metadata
property publisher: str
Return type

str

serialize()[source]

Serialize a message for appropriate on-the-wire format :rtype: Tuple[Union[str, bytes], dict] :return: Tuple of message payload and transport attributes

serialize_containerized()[source]

Serialize a message using containerized format regardless of configured settings. In most cases, you just want to use .serialize. :rtype: Union[str, bytes] :return: Message payload

serialize_firehose()[source]

Serialize a message for appropriate firehose file format. See hedwig.validators.base.HedwigBaseValidator.deserialize_firehose() for details. :rtype: str :return: Tuple of message payload and transport attributes

class hedwig.models.Metadata(timestamp=<factory>, publisher=<factory>, headers=<factory>, provider_metadata=None)[source]
timestamp: int

Timestamp of message creation in epoch milliseconds

publisher: str

Publisher of message

headers: Dict[str, str]

Custom headers sent with the message

provider_metadata: Any = None

Provider specific metadata, such as SQS Receipt, or Google ack id. This may be used to extend message visibility if the task is running longer than expected using Message.extend_visibility_timeout()

class hedwig.validators.jsonschema.JSONSchemaValidator(schema=None)[source]
checker = <FormatChecker checkers=['date', 'email', 'hostname', 'idn-email', 'idn-hostname', 'ipv4', 'ipv6', 'json-pointer', 'regex', 'relative-json-pointer', 'time']>

FormatChecker that checks for format JSON-schema field. This may be customized by an app by overriding setting HEDWIG_DATA_VALIDATOR_CLASS and defining more format checkers.

deserialize(message_payload, attributes, provider_metadata)

Deserialize a message from the on-the-wire format :type message_payload: Union[str, bytes] :param message_payload: Raw message payload as received from the backend :type provider_metadata: Any :param provider_metadata: Provider specific metadata :type attributes: dict :param attributes: Message attributes from the transport backend :rtype: Message :returns: Message object if valid :raise: hedwig.ValidationError if validation fails.

deserialize_containerized(message_payload)

Deserialize a message assuming containerized format regardless of configured setting. :type message_payload: Union[str, bytes] :param message_payload: Raw message payload as received from the backend :rtype: Message :returns: Message object if valid :raise: hedwig.ValidationError if validation fails.

deserialize_firehose(line)

Deserialize a message from a line in firehose file. This is slightly different from on-the-wire format:

  1. It always uses container format (i.e. HEDWIG_USE_TRANSPORT_MESSAGE_ATTRIBUTES is ignored)

  2. For binary file formats, its possible data is encoded as binary base64 blob rather than JSON.

  3. Known minor versions isn’t verified - knowing major version schema is good enough to read firehose.

Parameters

line (str) – Raw line read from firehose file

Return type

Message

Returns

Message object if valid

Raise

hedwig.ValidationError if validation fails.

serialize(message)

Serialize a message for appropriate on-the-wire format :rtype: Tuple[Union[str, bytes], dict] :return: Tuple of message payload and transport attributes

serialize_containerized(message)

Serialize a message using containerized format regardless of configured settings. In most cases, you just want to use .serialize. :rtype: Union[str, bytes] :return: Message payload

serialize_firehose(message)

Serialize a message for appropriate firehose file format. See hedwig.validators.base.HedwigBaseValidator.deserialize_firehose() for details. :rtype: str :return: Tuple of message payload and transport attributes

class hedwig.validators.protobuf.ProtobufValidator(schema_module=None)[source]

A validator that encodes the payload using Protobuf binary format.

deserialize(message_payload, attributes, provider_metadata)

Deserialize a message from the on-the-wire format :type message_payload: Union[str, bytes] :param message_payload: Raw message payload as received from the backend :type provider_metadata: Any :param provider_metadata: Provider specific metadata :type attributes: dict :param attributes: Message attributes from the transport backend :rtype: Message :returns: Message object if valid :raise: hedwig.ValidationError if validation fails.

deserialize_containerized(message_payload)

Deserialize a message assuming containerized format regardless of configured setting. :type message_payload: Union[str, bytes] :param message_payload: Raw message payload as received from the backend :rtype: Message :returns: Message object if valid :raise: hedwig.ValidationError if validation fails.

deserialize_firehose(line)

Deserialize a message from a line in firehose file. This is slightly different from on-the-wire format:

  1. It always uses container format (i.e. HEDWIG_USE_TRANSPORT_MESSAGE_ATTRIBUTES is ignored)

  2. For binary file formats, its possible data is encoded as binary base64 blob rather than JSON.

  3. Known minor versions isn’t verified - knowing major version schema is good enough to read firehose.

Parameters

line (str) – Raw line read from firehose file

Return type

Message

Returns

Message object if valid

Raise

hedwig.ValidationError if validation fails.

serialize(message)

Serialize a message for appropriate on-the-wire format :rtype: Tuple[Union[str, bytes], dict] :return: Tuple of message payload and transport attributes

serialize_containerized(message)

Serialize a message using containerized format regardless of configured settings. In most cases, you just want to use .serialize. :rtype: Union[str, bytes] :return: Message payload

serialize_firehose(message)

Serialize a message for appropriate firehose file format. See hedwig.validators.base.HedwigBaseValidator.deserialize_firehose() for details. :rtype: str :return: Tuple of message payload and transport attributes

hedwig.commands.requeue_dead_letter(num_messages=10, visibility_timeout=None)[source]

Re-queues everything in the Hedwig DLQ back into the Hedwig queue.

Parameters
  • num_messages (int) – Maximum number of messages to fetch in one call. Defaults to 10.

  • visibility_timeout (Optional[int]) – The number of seconds the message should remain invisible to other queue readers. Defaults to None, which is queue default

Return type

None

class hedwig.backends.gcp.GoogleMetadata(ack_id, subscription_path, publish_time, delivery_attempt)[source]

Google Pub/Sub specific metadata for a Message

ack_id: str

The ID used to ack the message

subscription_path: str

Path of the Pub/Sub subscription from which this message was pulled

publish_time: datetime.datetime

Time this message was originally published to Pub/Sub

delivery_attempt: int

The delivery attempt counter received from Pub/Sub. The first delivery of a given message will have this value as 1. The value is calculated as best effort and is approximate.

class hedwig.backends.aws.AWSMetadata(receipt, first_receive_time, sent_time, receive_count)[source]

AWS specific metadata for a Message

receipt: str

AWS receipt identifier

Testing

hedwig.testing.pytest_plugin.mock_hedwig_publish()[source]

A pytest fixture that mocks Hedwig publisher and lets you verify that your test publishes appropriate messages.

Return type

Generator[HedwigPublishMock, None, None]

class hedwig.testing.pytest_plugin.HedwigPublishMock(*args, **kw)[source]

Custom mock class used by hedwig.testing.pytest_plugin.mock_hedwig_publish() to mock the publisher.

assert_message_not_published(msg_type, data=None, version=StrictVersion('1.0'))[source]

Helper function to check that a Hedwig message of given type, data and schema was NOT sent.

Return type

None

assert_message_published(msg_type, data=None, version=StrictVersion('1.0'))[source]

Helper function to check if a Hedwig message with given type, data and schema version was sent.

Return type

None

hedwig.testing.config.unconfigure()[source]

If settings were configured, un-configure them - useful for testing only.

Return type

None

Exceptions

class hedwig.exceptions.RetryException(*args, **kwargs)[source]

Special exception that does not log an exception when it is received. This is a retryable error.

class hedwig.exceptions.IgnoreException[source]

Indicates that this task should be ignored.

class hedwig.exceptions.ValidationError[source]

Message failed JSON schema validation

class hedwig.exceptions.ConfigurationError[source]

There was some problem with settings

class hedwig.exceptions.CallbackNotFound[source]

No callback found that can handle the given message. Check your CALLBACKS settings.