Compute Queue
Getting Started
hmq
is a minimalistic Python task queue that makes it easy to parallelize expensive function calls. This tutorial will walk you through setting up hmq
, defining tasks, submitting jobs, and retrieving results.
First, install hmq
using pip:
pip install --upgrade hmq
To create a task, simply decorate a function with @hmq.task
. This turns the function into a task that can be queued and executed asynchronously.
import hmq
# Define a costly function as a task
@hmq.task
def costly_calculation(arg1, arg2):
# Simulate expensive computation
result = arg1 * arg2 # Replace with actual logic
return result
Once you've defined a task, you can call it as you would normally. However, instead of executing immediately, the calls will be recorded in the task queue.
# Example dataset
big_dataset = [(2, 3), (4, 5), (6, 7)]
# Queue multiple jobs
for case in big_dataset:
costly_calculation(*case)
At this point, all the function calls have been recorded, but they have not yet been executed.
To process the queued tasks, use .submit()
:
# Submit all queued tasks
tag = costly_calculation.submit()
This sends the tasks for execution.
You can monitor the progress of the tasks using tag.pull()
, which returns the number of remaining jobs in the queue.
import time
# Wait until all tasks are completed
while True:
remaining_jobs = tag.pull()
if remaining_jobs == 0:
break
time.sleep(10) # Check again after a short delay
Once all jobs are completed, you can access results and any errors encountered during execution.
print("Results:", list(tag.results))
print("Errors:", list(tag.errors))
How to structure your code
There is very little need to restructure your code. Function dependencies are automatically included, e.g. function subset
is automatically included once task
is decorated to become a queueing task:
def subset(number: int) -> int:
return number * 2
@hmq.task
def task(up_to: int) -> int:
return sum(subset(_) for _ in range(up_to))
task(10)
tag = task.submit()
tag.pull(blocking=True)
print(list(tag.results)) # [90]
Packing functions from other modules or packages is not supported; those are supposed to be installed on the cluster directly. This is achieved by specifying the dependency upon submission and by making the import function-local:
@hmq.task
def task(up_to) -> int:
# imports within the function
import numpy as np
return (np.arange(up_to) * 2).sum()
task(10)
tag = task.submit(packages=["numpy"])
tag.pull(blocking=True)
print(list(tag.results)) # [90]
This would automatically install the numpy
package in the execution environment.
Security
Tasks are encrypted and digitally signed, similar to SSH keys. Each task is traceable to an individual user, ensuring accountability and security. The code and results are stored in an encrypted format on the central database server. Execution requires permission from a root signature, ensuring that only authorized tasks are processed.
To gain access, a user must request permission through a signing request. This can be done using the following command:
import hmq
hmq.request_access(url)
where url
is the one of the API, typically starting with hmq
, e.g. hmq.example.com
.
This command produces a signing request in the format ADD-USER_......
, which should be shared with the system administrator. The administrator can then grant access using the following command, where the username can be chosen freely:
hmq.grant_access(signing_request, adminkey, username)
FAQ / Troubleshooting
Can I change the name of the tag?
All tags have a unique identifier which by default is function_name_UUID
. For example, this code
@hmq.task
def test():
return 42
might yield the tag test_d6425b7f-93af-4400-a71d-63a2b5b53698
. You can change the name upon submission using the argument tag
:
@hmq.task
def test():
return 42
for i in range(10):
test()
test.submit(tag="myname")
which now might yield the tag myname_d6425b7f-93af-4400-a71d-63a2b5b53698
. You cannot change the UUID which is required to avoid naming conflicts.
How long should one function evaluation take?
All individual function calls which are transferred via network to the compute datacenters should take at least a few seconds to avoid congestion in the default settings of hmq
. The result set should be the smaller the more results you expect per time. Most setups can ingest a result stream of at least 10 MB/s. If your function returns 1 KB and you have 1000 workers and the function runs for two seconds, you produce result data of 1KB/2s*1000 = 0.5 MB/s. The overhead per function call end-to-end is about 0.1 seconds, so extremely short run times become wasteful.
Downloading a large tag with millions of tasks takes too much memory.
Results much larger than system memory are no problem. By default the results are stored in memory until the tag
is written to a file. If your dataset does not fit in memory, please specify a local file for storage first:
tag = hmq.Tag.from_queue("test_d6425b7f-93af-4400-a71d-63a2b5b53698") # use your tag
tag.to_file("filename.db", stay_linked=True) # assigns a place to store the data
tag.pull() # downloads the results
The library assumes that the list of task ids (UUIDs) fits into memory. 1 GB of memory is enough for 30 million tasks.
Queueing System
hmq.api = API()
module-attribute
hmq.API()
hmq.API.delete_tasks(tasks: list[str] = None, tag: str = None)
Deletes tasks from the queue.
Identification of tasks may be done either by complete list of task ids or tag names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks |
list[str]
|
List of task ids. Defaults to None. |
None
|
tag |
str
|
Tag ids. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
_type_ |
List of task ids deleted. |
hmq.API.dequeue_tasks(datacenter: str, packagelists: dict[str, str], maxtasks: int, available: int, allocated: int, running: int, used: int)
hmq.API.fetch_function(function: str)
hmq.API.find_tasks(tag: str)
hmq.API.get_tag_status(tag: str) -> dict[str, int]
Fetch task statistics for a given tag.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tag |
str
|
The tag to inspect. |
required |
Returns:
Type | Description |
---|---|
dict[str, int]
|
Optional[Dict[str, int]]: Task status counts if successful, None otherwise. |
hmq.API.get_tasks_status(tasks: list[str])
hmq.API.grant_access(signing_request: str, adminkey: str, username: str)
Signs a user request and grants access.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signing_request |
str
|
The signing request generated with setup(). |
required |
adminkey |
str
|
The admin key. |
required |
username |
str
|
Arbitrary username to identify the user. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
Admin key was not supported. |
hmq.API.missing_dependencies(datacenter, installlist, python_version)
hmq.API.ping()
Test connection to server.
Returns:
Name | Type | Description |
---|---|---|
bool |
Whether the server is reachable. |
hmq.API.register_function(remote_function: dict, digest: str)
hmq.API.retrieve_results(tasks: list[str])
hmq.API.setup(url: str)
Connect to queueing API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str
|
The server URL. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
Only one instance is supported. |
Returns:
Name | Type | Description |
---|---|---|
str |
The signing request. |
hmq.API.store_results(results: list)
hmq.API.submit_tasks(tag: str, digest: str, calls: list, ncores: int = 1, datacenters: list = None)
hmq.API.sync_tasks(datacenter: str, known: list[str]) -> list[str]
hmq.API.warm_cache(function: str, payload)
hmq.CachedWorker(*args, **kwargs)
Bases: Worker
hmq.CachedWorker.execute_job(job, queue)
hmq.Tag(name)
Represents a collection of tasks.
hmq.Tag.errors: typing.Generator[str, None, None]
property
Generator yielding all errors.
The ordering remains stable as long as no tasks are added, but it does not match the order of submission.
Returns:
Type | Description |
---|---|
Generator[str, None, None]
|
Generator[str, None, None]: Yields errors, or None if the task |
Generator[str, None, None]
|
has not been completed or an error has not occurred. |
hmq.Tag.name: str
property
writable
hmq.Tag.results: typing.Generator[str, None, None]
property
Generator yielding all results.
The ordering remains stable as long as no tasks are added, but it does not match the order of submission.
Returns:
Type | Description |
---|---|
Generator[str, None, None]
|
Generator[str, None, None]: Yields results, or None if the task |
Generator[str, None, None]
|
has not been completed or an error occurred. |
hmq.Tag.__len__()
hmq.Tag.delete()
Delete all remaining tasks from the queue.
This abort the remaining calculations.
hmq.Tag.from_file(filename: str) -> Tag
staticmethod
Loads a tag and all results for all tasks from a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str
|
Source file. |
required |
Returns:
Name | Type | Description |
---|---|---|
Tag |
Tag
|
The loaded tag. |
Raises:
Type | Description |
---|---|
ValueError
|
If the file does not exist. |
hmq.Tag.from_queue(tag: str) -> Tag
staticmethod
Loads a tag from the queue by downloading the corresponding tasks.
Does not pull results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tag |
str
|
The tag name. |
required |
Returns:
Name | Type | Description |
---|---|---|
Tag |
Tag
|
The populated object. |
hmq.Tag.pull(blocking: bool = False, batchsize: int = 100, tasks_subset: list[str] = None, workers: int = 4) -> int
Downloads data from the queue for all tasks in the tag.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
blocking |
bool
|
Whether to retry downloading until all data has been fetched. Defaults to False. |
False
|
batchsize |
int
|
Number of tasks to download at once. Defaults to 100. |
100
|
tasks_subset |
list[str]
|
Subset of tasks to download. Defaults to None. |
None
|
workers |
int
|
Number of parallel workers. Defaults to 4. |
4
|
Returns:
Name | Type | Description |
---|---|---|
int |
int
|
Number of remaining tasks for which neither result nor error is available. |
hmq.Tag.to_file(filename: str, stay_linked: bool = False)
Exports the current state to a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str
|
Export destination. |
required |
stay_linked |
bool
|
If True, switches this tag to a file-based database. |
False
|