Skip to content

Compute Queue

Getting Started

hmq is a minimalistic API-first Python task queue that makes it easy to parallelize expensive function calls. This tutorial will walk you through setting up hmq, defining tasks, submitting jobs, and retrieving results.

First, install hmq using pip:

pip install --upgrade hmq

To create a task, simply decorate a function with @hmq.task. This turns the function into a task that can be queued and executed asynchronously.

import hmq

# Define a costly function as a task
@hmq.task
def costly_calculation(arg1, arg2):
    # Simulate expensive computation
    result = arg1 * arg2  # Replace with actual logic
    return result

Once you've defined a task, you can call it as you would normally. However, instead of executing immediately, the calls will be recorded in the task queue.

# Example dataset
big_dataset = [(2, 3), (4, 5), (6, 7)]

# Queue multiple jobs
for case in big_dataset:
    costly_calculation(*case)

At this point, all the function calls have been recorded, but they have not yet been executed.

To process the queued tasks, use .submit():

# Submit all queued tasks
tag = costly_calculation.submit()

This sends the tasks for execution.

You can monitor the progress of the tasks using tag.pull(), which returns the number of remaining jobs in the queue.

import time

# Wait until all tasks are completed
while True:
    remaining_jobs = tag.pull()
    if remaining_jobs == 0:
        break
    time.sleep(10)  # Check again after a short delay

Once all jobs are completed, you can access results and any errors encountered during execution.

print("Results:", list(tag.results))
print("Errors:", list(tag.errors))

How to structure your code

There is very little need to restructure your code. Function dependencies are automatically included, e.g. function subset is automatically included once task is decorated to become a queueing task:

def subset(number: int) -> int:
    return number * 2

@hmq.task
def task(up_to: int) -> int:
    return sum(subset(_) for _ in range(up_to))

task(10)
tag = task.submit()
tag.pull(blocking=True)
print(list(tag.results))  # [90]

Packing functions from other modules or packages is not supported; those are supposed to be installed on the cluster directly. This is achieved by specifying the dependency upon submission and by making the import function-local:

@hmq.task
def task(up_to) -> int:
    # imports within the function
    import numpy as np

    return (np.arange(up_to) * 2).sum()

task(10)
tag = task.submit(packages=["numpy"])
tag.pull(blocking=True)
print(list(tag.results))  # [90]

This would automatically install the numpy package in the execution environment.

Security

Tasks are encrypted and digitally signed, similar to SSH keys. Each task is traceable to an individual user, ensuring accountability and security. The code and results are stored in an encrypted format on the central database server. Execution requires permission from a root signature, ensuring that only authorized tasks are processed.

To gain access, a user must request permission through a signing request. This can be done using the following command:

import hmq
hmq.request_access(url)

where url is the one of the API, typically starting with hmq, e.g. hmq.example.com.

This command produces a signing request in the format ADD-USER_......, which should be shared with the system administrator. The administrator can then grant access using the following command, where the username can be chosen freely:

hmq.grant_access(signing_request, adminkey, username)

FAQ / Troubleshooting

Can I change the name of the tag?

All tags have a unique identifier which by default is function_name_UUID. For example, this code

@hmq.task
def test():
    return 42

might yield the tag test_d6425b7f-93af-4400-a71d-63a2b5b53698. You can change the name upon submission using the argument tag:

@hmq.task
def test():
    return 42

for i in range(10):
    test()
test.submit(tag="myname")

which now might yield the tag myname_d6425b7f-93af-4400-a71d-63a2b5b53698. You cannot change the UUID which is required to avoid naming conflicts.

How long should one function evaluation take?

All individual function calls which are transferred via network to the compute datacenters should take at least a few seconds to avoid congestion in the default settings of hmq. The result set should be the smaller the more results you expect per time. Most setups can ingest a result stream of at least 10 MB/s. If your function returns 1 KB and you have 1000 workers and the function runs for two seconds, you produce result data of 1KB/2s*1000 = 0.5 MB/s. The overhead per function call end-to-end is about 0.1 seconds, so extremely short run times become wasteful.

What is the database format and is it suited for large amounts of data and for archiving?

The database format used is compressed JSON stored in SQLite, which is a well-established and stable format that works efficiently with large amounts of data. SQLite is a lightweight and reliable database engine, and when combined with compressed JSON, it provides an excellent solution for managing and archiving data. The format is highly suitable for out-of-core processing, meaning it can handle datasets that do not fit entirely in memory. It also allows for individual results to be addressed directly, making it efficient for querying and retrieval. Furthermore, SQLite supports concurrent access, allowing multiple users or processes to interact with the database simultaneously. There are libraries available in many programming languages, especially Python, making it a versatile choice for data storage and processing.

Downloading a large tag with millions of tasks takes too much memory.

Results much larger than system memory are no problem. By default the results are stored in memory until the tag is written to a file. If your dataset does not fit in memory, please specify a local file for storage first:

tag = hmq.Tag.from_queue("test_d6425b7f-93af-4400-a71d-63a2b5b53698") # use your tag
tag.to_file("filename.db", stay_linked=True) # assigns a place to store the data
tag.pull() # downloads the results

The library assumes that the list of task ids (UUIDs) fits into memory. 1 GB of memory is enough for 30 million tasks.

How to include a local module which is not pip-installable?

If a module is pip-installable, its is greatly preferred to do so. However, if you have a local module named foobar which your code needs, you can submit it to the queue using the following snippet prior to defining a task with the hmq.task decorator:

import foobar
import cloudpickle
cloudpickle.register_pickle_by_value(foobar)

Queueing System

hmq.api = API() module-attribute

hmq.API()

hmq.API.cancel_tasks(tasks: list[str] = None, tag: str = None) -> list[str]

Canceles tasks in the queue.

Identification of tasks may be done either by complete list of task ids or tag names.

PARAMETER DESCRIPTION
tasks

List of task ids. Defaults to None.

TYPE: list[str] DEFAULT: None

tag

Tag ids. Defaults to None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
list[str]

List of task ids canceled.

hmq.API.delete_tasks(tasks: list[str] = None, tag: str = None) -> list[str]

Deletes tasks from the queue.

Identification of tasks may be done either by complete list of task ids or tag names.

PARAMETER DESCRIPTION
tasks

List of task ids. Defaults to None.

TYPE: list[str] DEFAULT: None

tag

Tag ids. Defaults to None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
list[str]

List of task ids deleted.

hmq.API.dequeue_tasks(datacenter: str, packagelists: dict[str, str], maxtasks: int, available: int, allocated: int, running: int, used: int)

hmq.API.fetch_function(function: str)

hmq.API.find_tasks(tag: str)

hmq.API.get_tag_status(tag: str) -> dict[str, int]

Fetch task statistics for a given tag.

PARAMETER DESCRIPTION
tag

The tag to inspect.

TYPE: str

RETURNS DESCRIPTION
dict[str, int]

Task status counts if successful, None otherwise.

hmq.API.get_tasks_status(tasks: list[str])

hmq.API.grant_access(signing_request: str, adminkey: str, username: str)

Signs a user request and grants access.

PARAMETER DESCRIPTION
signing_request

The signing request generated with setup().

TYPE: str

adminkey

The admin key.

TYPE: str

username

Arbitrary username to identify the user.

TYPE: str

RAISES DESCRIPTION
ValueError

Admin key was not supported.

hmq.API.missing_dependencies(datacenter, installlist, python_version)

hmq.API.ping() -> bool

Test connection to server.

RETURNS DESCRIPTION
bool

Whether the server is reachable.

hmq.API.register_function(remote_function: dict, digest: str)

hmq.API.retrieve_results(tasks: list[str])

hmq.API.setup(url: str) -> str

Connect to queueing API.

PARAMETER DESCRIPTION
url

The server URL.

TYPE: str

RAISES DESCRIPTION
ValueError

Only one instance is supported.

RETURNS DESCRIPTION
str

The signing request.

hmq.API.store_results(results: list)

hmq.API.submit_tasks(tag: str, digest: str, calls: list, ncores: int = 1, datacenters: list = None)

hmq.API.sync_tasks(datacenter: str, known: list[str]) -> list[str]

hmq.API.warm_cache(function: str, payload)

hmq.CachedWorker(*args, **kwargs)

Bases: Worker

hmq.CachedWorker.execute_job(job, queue)

hmq.Tag(name)

Represents a collection of tasks.

ATTRIBUTE DESCRIPTION
errors

Generator yielding all errors.

TYPE: Generator[str, None, None]

n_errors

Count the number of errors.

TYPE: int

n_results

Count the number of results.

TYPE: int

name

TYPE: str

results

Generator yielding all results.

TYPE: Generator[str, None, None]

hmq.Tag.errors: typing.Generator[str, None, None] property

Generator yielding all errors.

The ordering remains stable as long as no tasks are added, but it does not match the order of submission.

RETURNS DESCRIPTION
None

Yields errors, or None if the task has not been completed or an

None

error has not occurred.

hmq.Tag.n_errors: int property

Count the number of errors.

hmq.Tag.n_results: int property

Count the number of results.

hmq.Tag.name: str property writable

hmq.Tag.results: typing.Generator[str, None, None] property

Generator yielding all results.

The ordering remains stable as long as no tasks are added, but it does not match the order of submission.

RETURNS DESCRIPTION
None

Yields results, or None if the task has not been completed or an

None

error occurred.

hmq.Tag.__len__()

hmq.Tag.cancel()

Cancels all remaining tasks in the queue.

hmq.Tag.delete(downloaded_only=False)

Delete data from the queue.

hmq.Tag.fetch_tasks() -> int

Adds all remaining tasks in the queue for this tag.

Existing tasks in this tag remain available.

RETURNS DESCRIPTION
int

Number of tasks added.

hmq.Tag.from_file(filename: str) -> Tag staticmethod

Loads a tag and all results for all tasks from a file.

PARAMETER DESCRIPTION
filename

Source file.

TYPE: str

RETURNS DESCRIPTION
Tag

The loaded tag.

RAISES DESCRIPTION
ValueError

If the file does not exist.

hmq.Tag.from_queue(tag: str) -> Tag staticmethod

Loads a tag from the queue by downloading the corresponding tasks.

Does not pull results.

PARAMETER DESCRIPTION
tag

The tag name.

TYPE: str

RETURNS DESCRIPTION
Tag

The populated object.

hmq.Tag.pull(blocking: bool = False, batchsize: int = 100, tasks_subset: list[str] = None, workers: int = 4) -> int

Downloads data from the queue for all tasks in the tag.

PARAMETER DESCRIPTION
blocking

Whether to retry downloading until all data has been fetched. Defaults to False.

TYPE: bool DEFAULT: False

batchsize

Number of tasks to download at once. Defaults to 100.

TYPE: int DEFAULT: 100

tasks_subset

Subset of tasks to download. Defaults to None.

TYPE: list[str] DEFAULT: None

workers

Number of parallel workers. Defaults to 4.

TYPE: int DEFAULT: 4

RETURNS DESCRIPTION
int

Number of remaining tasks for which neither result nor error is available.

hmq.Tag.to_file(filename: str, stay_linked: bool = False, overwrite: bool = False)

Exports the current state to a file.

PARAMETER DESCRIPTION
filename

Export destination.

TYPE: str

stay_linked

If True, switches this tag to a file-based database.

TYPE: bool DEFAULT: False