Python Celery – How to Get a Task Result Object by ID?

Rate this post

What is Celery?

Celery allows you to scale your application by distributing processing workload among multiple worker machines or processes. Celery uses task queues as units of work. A dedicated worker process monitors those task queues and pulls new work from those if it becomes available. Clients add messages to the task queue and brokers deliver them to workers. You can scale your application by using multiple workers and brokers.

How to Create a Minimal Celery App?

The minimal standard example for creating your first Celery app consists only of 5 lines of code! You import the Celery class and create an app using its constructor. You pass the broker into it and define the task using the decorator @app.task. That’s about it to get started!

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def hello():
    return 'hello world'

How to Get the Current Task Object’s ID?

Each Celery Task object comes with an associated Task.request object. A Task Request contains information and state related to the currently executing task. You can access the identifier of the executing task object via app.Task.request.id.

To get the currently executing task, you can run current_task.request from the current_task module after importing it with from celery import current_task.

Here’s a minimal code example:

from celery import current_task
print(current_task.request)

How to Get a Task Result Object by ID?

You may have multiple task result objects in a task queue. Each task has a unique ID. If you already know the identifier of your desired task object, how to get it by id?

Example: Say, you store a task identifier from celery.result.AsyncResult in a database along with the related task item. With this, you can retrieve all task identifiers that relate to a specific item. After retrieving the task_id from the database, how to retrieve information about the task’s state?

Answer: Using task.AsyncResult is the recommended way of doing this:

result = my_task.AsyncResult(task_id)
x = result.get()

This returns a AsyncResult object using the tasks current result backend. To specify a custom or the current application’s default backend, use app.AsyncResult:

result = app.AsyncResult(task_id)
x = result.get()

Note that the result backend is now deprecated so you should avoid it and use either the RPC backend for RPC-style calls or a persistent backend if you need multi-consumer access to results.” (source)

Need more Python training? Check out our free email academy with cheat sheets, Python lessons, and lots of fun: