Broker

Defines the broker class which can interact with a ddmq directory. You define a broker by supplying at least a root directory, for example

>>> import ddmq
>>> b = ddmq.broker('../temp/ddmq', create=True)
>>> print(b)
create = True
default_settings = {'priority': 999, 'requeue': True, 'requeue_prio': 0, 'message_timeout': 600, 'cleaned': 0}
global_settings = {'priority': 999, 'requeue': True, 'requeue_prio': 0, 'message_timeout': 600, 'cleaned': 0}
queue_settings = {}
root = ../temp/ddmq
>>> b.publish('queue_name', "Hello World!")
filename = queue_name/999.1.ddmq89723438b9d0403c91943f4ffaf8ba35
id = 89723438b9d0403c91943f4ffaf8ba35
message = Hello World!
priority = 999
queue = queue_name
queue_number = 123456782356356256566
requeue = False
requeue_counter = 0
requeue_limit = None
timeout = None
>>> msg = b.consume('queue_name')
filename = 1539702458.999.1.ddmq89723438b9d0403c91943f4ffaf8ba35
id = 89723438b9d0403c91943f4ffaf8ba35
message = Hello World!
priority = 999
queue = queue_name
queue_number = 1234567823561341341356
requeue = False
requeue_counter = 0
requeue_limit = None
timeout = None
>>> print(msg.message)
Hello World!
exception broker.DdmqError(message, error)[source]

Helper class to raise custom errors

class broker.broker(root, create=False, verbose=False, debug=False)[source]

Class to interact with messaging queues

ack(queue, msg_files=None, requeue=False, skip_cleaning=False)[source]

Positive acknowledgement of message(s)

Parameters:
  • queue – name of the queue the files are in, or the message object to be acked
  • msg_files – either a single path or a list of paths to message(s) to ack
  • requeue – True will force message(s) to be requeued, False will force messages to be purged, None (default) will leave it up to the message itself if it should be requeued or not
  • skip_cleaning – if False, the client will first clean out any expired messages from the queue’s work directory. If True, the client will just ack the message(s) right away and not bother doing any cleaning first (faster).
Returns:

a list of file names of all messages acknowledged

check_dir(path, only_conf=False)[source]

Check if the directory contains a ddmq.yaml file to avoid littering non-queue dirs

Parameters:
  • path – path to the directory to check
  • only_conf – if True, only check if the ddmq.yaml file is present. If False, also check that there is a subdirectory called ‘work’
Returns:

None

clean(queue, force=False)[source]

Clean out expired message from a specified queue

Parameters:queue – name of the queue to clean
Returns:True if everything goes according to plan, False if no cleaning was done
clean_all()[source]

Clean all the queues in the root director

Parameters:None
Returns:None
consume(queue, n=1, skip_cleaning=False, path=None)[source]

Consume 1 (or more) messages from a specified queue. The consumed messages will be moved to the queues work folder and have the expiry epoch time prepended to the file name.

Parameters:
  • queue – name of the queue to consume from
  • n – the number (int) of messages to consume
  • skip_cleaning – if False, the client will first clean out any expired messages from the queue’s work directory. If True, the client will just consume the message(s) right away and not bother doing any cleaning first (faster)
  • path – specified path to message file to consume, instead of fetching the next message in line
Returns:

a single message object if n=1 (default), or a list of the messages that were fetched if n > 1

create_folder(path)[source]

Create a folder at a specified path

Parameters:path – path to the directory to be created
Returns:None
create_queue(queue)[source]

Create a specified queue

Parameters:queue – name of the queue to create
Returns:True if everything goes according to plan
delete_message(path)[source]

Delete a specified message

Parameters:path – path to the message, or a message object, to be deleted
Returns:None
delete_queue(queue)[source]

Delete a specified queue

Parameters:queue – name of the queue to delete
Returns:True if everything goes according to plan
get_config_file(queue='')[source]

Get the settings from the config file of a queue or the root dir

Parameters:queue – if empty, returns the config file from the root folder. If a queue name, will get the config file for that queue
Returns:A dict containing all the settings specified in the config file
get_message(path)[source]

Get a specified message

Parameters:path – path to the message to fetch
Returns:the requested message
get_message_list(queue)[source]

Gets a list of all messages in the specified queue

Parameters:queue – name of the queue to get messages from
Returns:returns 2 lists of file names. The first is the list of all messages still waiting in the queue and the second is a list of all the messages in the queue’s work directory
get_queue_number()[source]

Generate the next incremental queue number for a specified queue (epoch time of creation without the decimal punctuation)

Parameters:None
Returns:a string that is the current timestamp, with the decimal punctuation removed
get_settings(queue)[source]

Get the settings for the specified queue. Will try to give a cached version first, and if it is the first time the settings are requested it will read the settings from the config file and store the result

Parameters:queue – name of the queue to get settings for
Returns:None
list_queues()[source]

Generate a list of all valid queues (subdirectories with ddmq.yaml files in them) in the root folder

Parameters:None
Returns:a list of names of valid queues
nack(queue, msg_files=None, requeue=False, skip_cleaning=False)[source]

Negative acknowledgement of message(s)

Parameters:
  • queue – name of the queue the files are in, or the message object to be nacked
  • msg_files – either a single path or a list of paths to message(s) to nack
  • requeue – True will force message(s) to be requeued, False will force messages to be purged, None (default) will leave it up to the message itself if it should be requeued or not
  • skip_cleaning – if False, the client will first clean out any expired messages from the queue’s work directory. If True, the client will just nack the message(s) right away and not bother doing any cleaning first (faster).
Returns:

True if everything goes according to plan

publish(queue, msg_text=None, priority=None, skip_cleaning=True, requeue=True, requeue_prio=None, timeout=None, requeue_counter=0, requeue_limit=None)[source]

Publish a message to a queue

Parameters:
  • queue – name of the queue to publish to
  • msg_text – the actual message
  • priority – the priority of the message (default 999). Lower number means higher priority when processing
  • skip_cleaning – if False, the client will first clean out any expired messages from the queue’s work directory. If True, the client will just publish the message right away and not bother doing any cleaning first (faster).
  • requeue – if True, the message will be requeud after it expires. If False it will just be deleted.
  • requeue_prio – if set (int), the message will get this priority when requeued. Default is 0, meaning requeued messages will be put first in the queue.
  • timeout – if set (int), will override the global and queue specific default setting for how many seconds a message expires after.
Returns:

a copy of the message published

purge_queue(queue)[source]

Purge the specified queue of all messages, but keep the queue folders and config file

Parameters:queue – name of the queue to purge
Returns:a list of 2 numbers; the first is how many messages still waiting in the queue were deleted, and the second how many messages in the queues work directory that was deleted
requeue_message(path, msg=None)[source]

Requeue a specified message

Parameters:path – path to the message to requeue
Returns:True if everything goes according to plan
update_settings_file(queue='', package={})[source]

Update the settings in a config file for a specified queue or in the root dir

Parameters:
  • queue – if empty, change the config in the root folder. If a queue name, will change the config for that queue
  • package – a dict containging the changes to the config file
Returns:

None

version()[source]

Get package version

Parameters:None
Returns:the package version