Broker¶
Defines the broker class which can interact with a ddmq directory. You define a broker by supplying at least a root directory, for example
>>> import ddmq
>>> b = ddmq.broker('../temp/ddmq', create=True)
>>> print(b)
create = True
default_settings = {'priority': 999, 'requeue': True, 'requeue_prio': 0, 'message_timeout': 600, 'cleaned': 0}
global_settings = {'priority': 999, 'requeue': True, 'requeue_prio': 0, 'message_timeout': 600, 'cleaned': 0}
queue_settings = {}
root = ../temp/ddmq
>>> b.publish('queue_name', "Hello World!")
filename = queue_name/999.1.ddmq89723438b9d0403c91943f4ffaf8ba35
id = 89723438b9d0403c91943f4ffaf8ba35
message = Hello World!
priority = 999
queue = queue_name
queue_number = 123456782356356256566
requeue = False
requeue_counter = 0
requeue_limit = None
timeout = None
>>> msg = b.consume('queue_name')
filename = 1539702458.999.1.ddmq89723438b9d0403c91943f4ffaf8ba35
id = 89723438b9d0403c91943f4ffaf8ba35
message = Hello World!
priority = 999
queue = queue_name
queue_number = 1234567823561341341356
requeue = False
requeue_counter = 0
requeue_limit = None
timeout = None
>>> print(msg.message)
Hello World!
-
class
broker.
broker
(root, create=False, verbose=False, debug=False)[source]¶ Class to interact with messaging queues
-
ack
(queue, msg_files=None, requeue=False, skip_cleaning=False)[source]¶ Positive acknowledgement of message(s)
Parameters: - queue – name of the queue the files are in, or the message object to be acked
- msg_files – either a single path or a list of paths to message(s) to ack
- requeue – True will force message(s) to be requeued, False will force messages to be purged, None (default) will leave it up to the message itself if it should be requeued or not
- skip_cleaning – if False, the client will first clean out any expired messages from the queue’s work directory. If True, the client will just ack the message(s) right away and not bother doing any cleaning first (faster).
Returns: a list of file names of all messages acknowledged
-
check_dir
(path, only_conf=False)[source]¶ Check if the directory contains a ddmq.yaml file to avoid littering non-queue dirs
Parameters: - path – path to the directory to check
- only_conf – if True, only check if the ddmq.yaml file is present. If False, also check that there is a subdirectory called ‘work’
Returns: None
-
clean
(queue, force=False)[source]¶ Clean out expired message from a specified queue
Parameters: queue – name of the queue to clean Returns: True if everything goes according to plan, False if no cleaning was done
-
consume
(queue, n=1, skip_cleaning=False, path=None)[source]¶ Consume 1 (or more) messages from a specified queue. The consumed messages will be moved to the queues work folder and have the expiry epoch time prepended to the file name.
Parameters: - queue – name of the queue to consume from
- n – the number (int) of messages to consume
- skip_cleaning – if False, the client will first clean out any expired messages from the queue’s work directory. If True, the client will just consume the message(s) right away and not bother doing any cleaning first (faster)
- path – specified path to message file to consume, instead of fetching the next message in line
Returns: a single message object if n=1 (default), or a list of the messages that were fetched if n > 1
-
create_folder
(path)[source]¶ Create a folder at a specified path
Parameters: path – path to the directory to be created Returns: None
-
create_queue
(queue)[source]¶ Create a specified queue
Parameters: queue – name of the queue to create Returns: True if everything goes according to plan
-
delete_message
(path)[source]¶ Delete a specified message
Parameters: path – path to the message, or a message object, to be deleted Returns: None
-
delete_queue
(queue)[source]¶ Delete a specified queue
Parameters: queue – name of the queue to delete Returns: True if everything goes according to plan
-
get_config_file
(queue='')[source]¶ Get the settings from the config file of a queue or the root dir
Parameters: queue – if empty, returns the config file from the root folder. If a queue name, will get the config file for that queue Returns: A dict containing all the settings specified in the config file
-
get_message
(path)[source]¶ Get a specified message
Parameters: path – path to the message to fetch Returns: the requested message
-
get_message_list
(queue)[source]¶ Gets a list of all messages in the specified queue
Parameters: queue – name of the queue to get messages from Returns: returns 2 lists of file names. The first is the list of all messages still waiting in the queue and the second is a list of all the messages in the queue’s work directory
-
get_queue_number
()[source]¶ Generate the next incremental queue number for a specified queue (epoch time of creation without the decimal punctuation)
Parameters: None – Returns: a string that is the current timestamp, with the decimal punctuation removed
-
get_settings
(queue)[source]¶ Get the settings for the specified queue. Will try to give a cached version first, and if it is the first time the settings are requested it will read the settings from the config file and store the result
Parameters: queue – name of the queue to get settings for Returns: None
-
list_queues
()[source]¶ Generate a list of all valid queues (subdirectories with ddmq.yaml files in them) in the root folder
Parameters: None – Returns: a list of names of valid queues
-
nack
(queue, msg_files=None, requeue=False, skip_cleaning=False)[source]¶ Negative acknowledgement of message(s)
Parameters: - queue – name of the queue the files are in, or the message object to be nacked
- msg_files – either a single path or a list of paths to message(s) to nack
- requeue – True will force message(s) to be requeued, False will force messages to be purged, None (default) will leave it up to the message itself if it should be requeued or not
- skip_cleaning – if False, the client will first clean out any expired messages from the queue’s work directory. If True, the client will just nack the message(s) right away and not bother doing any cleaning first (faster).
Returns: True if everything goes according to plan
-
publish
(queue, msg_text=None, priority=None, skip_cleaning=True, requeue=True, requeue_prio=None, timeout=None, requeue_counter=0, requeue_limit=None)[source]¶ Publish a message to a queue
Parameters: - queue – name of the queue to publish to
- msg_text – the actual message
- priority – the priority of the message (default 999). Lower number means higher priority when processing
- skip_cleaning – if False, the client will first clean out any expired messages from the queue’s work directory. If True, the client will just publish the message right away and not bother doing any cleaning first (faster).
- requeue – if True, the message will be requeud after it expires. If False it will just be deleted.
- requeue_prio – if set (int), the message will get this priority when requeued. Default is 0, meaning requeued messages will be put first in the queue.
- timeout – if set (int), will override the global and queue specific default setting for how many seconds a message expires after.
Returns: a copy of the message published
-
purge_queue
(queue)[source]¶ Purge the specified queue of all messages, but keep the queue folders and config file
Parameters: queue – name of the queue to purge Returns: a list of 2 numbers; the first is how many messages still waiting in the queue were deleted, and the second how many messages in the queues work directory that was deleted
-
requeue_message
(path, msg=None)[source]¶ Requeue a specified message
Parameters: path – path to the message to requeue Returns: True if everything goes according to plan
-
update_settings_file
(queue='', package={})[source]¶ Update the settings in a config file for a specified queue or in the root dir
Parameters: - queue – if empty, change the config in the root folder. If a queue name, will change the config for that queue
- package – a dict containging the changes to the config file
Returns: None
-