Skip to content

Compute Queue

Getting Started

hmq is a minimalistic Python task queue that makes it easy to parallelize expensive function calls. This tutorial will walk you through setting up hmq, defining tasks, submitting jobs, and retrieving results.

First, install hmq using pip:

pip install --upgrade hmq

To create a task, simply decorate a function with @hmq.task. This turns the function into a task that can be queued and executed asynchronously.

import hmq

# Define a costly function as a task
@hmq.task
def costly_calculation(arg1, arg2):
    # Simulate expensive computation
    result = arg1 * arg2  # Replace with actual logic
    return result

Once you've defined a task, you can call it as you would normally. However, instead of executing immediately, the calls will be recorded in the task queue.

# Example dataset
big_dataset = [(2, 3), (4, 5), (6, 7)]

# Queue multiple jobs
for case in big_dataset:
    costly_calculation(*case)

At this point, all the function calls have been recorded, but they have not yet been executed.

To process the queued tasks, use .submit():

# Submit all queued tasks
tag = costly_calculation.submit()

This sends the tasks for execution.

You can monitor the progress of the tasks using tag.pull(), which returns the number of remaining jobs in the queue.

import time

# Wait until all tasks are completed
while True:
    remaining_jobs = tag.pull()
    if remaining_jobs == 0:
        break
    time.sleep(10)  # Check again after a short delay

Once all jobs are completed, you can access results and any errors encountered during execution.

print("Results:", list(tag.results))
print("Errors:", list(tag.errors))

How to structure your code

There is very little need to restructure your code. Function dependencies are automatically included, e.g. function subset is automatically included once task is decorated to become a queueing task:

def subset(number: int) -> int:
    return number * 2

@hmq.task
def task(up_to: int) -> int:
    return sum(subset(_) for _ in range(up_to))

task(10)
tag = task.submit()
tag.pull(blocking=True)
print(list(tag.results))  # [90]

Packing functions from other modules or packages is not supported; those are supposed to be installed on the cluster directly. This is achieved by specifying the dependency upon submission and by making the import function-local:

@hmq.task
def task(up_to) -> int:
    # imports within the function
    import numpy as np

    return (np.arange(up_to) * 2).sum()

task(10)
tag = task.submit(packages=["numpy"])
tag.pull(blocking=True)
print(list(tag.results))  # [90]

This would automatically install the numpy package in the execution environment.

Security

Tasks are encrypted and digitally signed, similar to SSH keys. Each task is traceable to an individual user, ensuring accountability and security. The code and results are stored in an encrypted format on the central database server. Execution requires permission from a root signature, ensuring that only authorized tasks are processed.

To gain access, a user must request permission through a signing request. This can be done using the following command:

import hmq
hmq.request_access(url)

where url is the one of the API, typically starting with hmq, e.g. hmq.example.com.

This command produces a signing request in the format ADD-USER_......, which should be shared with the system administrator. The administrator can then grant access using the following command, where the username can be chosen freely:

hmq.grant_access(signing_request, adminkey, username)

FAQ / Troubleshooting

Can I change the name of the tag?

All tags have a unique identifier which by default is function_name_UUID. For example, this code

@hmq.task
def test():
    return 42

might yield the tag test_d6425b7f-93af-4400-a71d-63a2b5b53698. You can change the name upon submission using the argument tag:

@hmq.task
def test():
    return 42

for i in range(10):
    test()
test.submit(tag="myname")

which now might yield the tag myname_d6425b7f-93af-4400-a71d-63a2b5b53698. You cannot change the UUID which is required to avoid naming conflicts.

How long should one function evaluation take?

All individual function calls which are transferred via network to the compute datacenters should take at least a few seconds to avoid congestion in the default settings of hmq. The result set should be the smaller the more results you expect per time. Most setups can ingest a result stream of at least 10 MB/s. If your function returns 1 KB and you have 1000 workers and the function runs for two seconds, you produce result data of 1KB/2s*1000 = 0.5 MB/s. The overhead per function call end-to-end is about 0.1 seconds, so extremely short run times become wasteful.

Downloading a large tag with millions of tasks takes too much memory.

Results much larger than system memory are no problem. By default the results are stored in memory until the tag is written to a file. If your dataset does not fit in memory, please specify a local file for storage first:

tag = hmq.Tag.from_queue("test_d6425b7f-93af-4400-a71d-63a2b5b53698") # use your tag
tag.to_file("filename.db", stay_linked=True) # assigns a place to store the data
tag.pull() # downloads the results

The library assumes that the list of task ids (UUIDs) fits into memory. 1 GB of memory is enough for 30 million tasks.

Queueing System

hmq.api = API() module-attribute

hmq.API()

hmq.API.delete_tasks(tasks: list[str] = None, tag: str = None)

Deletes tasks from the queue.

Identification of tasks may be done either by complete list of task ids or tag names.

Parameters:

Name Type Description Default
tasks list[str]

List of task ids. Defaults to None.

None
tag str

Tag ids. Defaults to None.

None

Returns:

Name Type Description
_type_

List of task ids deleted.

hmq.API.dequeue_tasks(datacenter: str, packagelists: dict[str, str], maxtasks: int, available: int, allocated: int, running: int, used: int)

hmq.API.fetch_function(function: str)

hmq.API.find_tasks(tag: str)

hmq.API.get_tag_status(tag: str) -> dict[str, int]

Fetch task statistics for a given tag.

Parameters:

Name Type Description Default
tag str

The tag to inspect.

required

Returns:

Type Description
dict[str, int]

Optional[Dict[str, int]]: Task status counts if successful, None otherwise.

hmq.API.get_tasks_status(tasks: list[str])

hmq.API.grant_access(signing_request: str, adminkey: str, username: str)

Signs a user request and grants access.

Parameters:

Name Type Description Default
signing_request str

The signing request generated with setup().

required
adminkey str

The admin key.

required
username str

Arbitrary username to identify the user.

required

Raises:

Type Description
ValueError

Admin key was not supported.

hmq.API.missing_dependencies(datacenter, installlist, python_version)

hmq.API.ping()

Test connection to server.

Returns:

Name Type Description
bool

Whether the server is reachable.

hmq.API.register_function(remote_function: dict, digest: str)

hmq.API.retrieve_results(tasks: list[str])

hmq.API.setup(url: str)

Connect to queueing API.

Parameters:

Name Type Description Default
url str

The server URL.

required

Raises:

Type Description
ValueError

Only one instance is supported.

Returns:

Name Type Description
str

The signing request.

hmq.API.store_results(results: list)

hmq.API.submit_tasks(tag: str, digest: str, calls: list, ncores: int = 1, datacenters: list = None)

hmq.API.sync_tasks(datacenter: str, known: list[str]) -> list[str]

hmq.API.warm_cache(function: str, payload)

hmq.CachedWorker(*args, **kwargs)

Bases: Worker

hmq.CachedWorker.execute_job(job, queue)

hmq.Tag(name)

Represents a collection of tasks.

hmq.Tag.errors: typing.Generator[str, None, None] property

Generator yielding all errors.

The ordering remains stable as long as no tasks are added, but it does not match the order of submission.

Returns:

Type Description
Generator[str, None, None]

Generator[str, None, None]: Yields errors, or None if the task

Generator[str, None, None]

has not been completed or an error has not occurred.

hmq.Tag.name: str property writable

hmq.Tag.results: typing.Generator[str, None, None] property

Generator yielding all results.

The ordering remains stable as long as no tasks are added, but it does not match the order of submission.

Returns:

Type Description
Generator[str, None, None]

Generator[str, None, None]: Yields results, or None if the task

Generator[str, None, None]

has not been completed or an error occurred.

hmq.Tag.__len__()

hmq.Tag.delete()

Delete all remaining tasks from the queue.

This abort the remaining calculations.

hmq.Tag.from_file(filename: str) -> Tag staticmethod

Loads a tag and all results for all tasks from a file.

Parameters:

Name Type Description Default
filename str

Source file.

required

Returns:

Name Type Description
Tag Tag

The loaded tag.

Raises:

Type Description
ValueError

If the file does not exist.

hmq.Tag.from_queue(tag: str) -> Tag staticmethod

Loads a tag from the queue by downloading the corresponding tasks.

Does not pull results.

Parameters:

Name Type Description Default
tag str

The tag name.

required

Returns:

Name Type Description
Tag Tag

The populated object.

hmq.Tag.pull(blocking: bool = False, batchsize: int = 100, tasks_subset: list[str] = None, workers: int = 4) -> int

Downloads data from the queue for all tasks in the tag.

Parameters:

Name Type Description Default
blocking bool

Whether to retry downloading until all data has been fetched. Defaults to False.

False
batchsize int

Number of tasks to download at once. Defaults to 100.

100
tasks_subset list[str]

Subset of tasks to download. Defaults to None.

None
workers int

Number of parallel workers. Defaults to 4.

4

Returns:

Name Type Description
int int

Number of remaining tasks for which neither result nor error is available.

hmq.Tag.to_file(filename: str, stay_linked: bool = False)

Exports the current state to a file.

Parameters:

Name Type Description Default
filename str

Export destination.

required
stay_linked bool

If True, switches this tag to a file-based database.

False