Celery
Official Celery documentation: http://docs.celeryproject.org/en/latest/ What is it ==========
Celery is a library we use to perform tasks outside the bounds of an HTTP request.
How to use celery
All celery tasks should go into a tasks.py file or tasks module in a django app.
This ensures that autodiscover_tasks
can find the task and register it with the celery workers.
These tasks should be decorated with one of the following:
@task
defines a task that is called manually (withtask_function_name.delay
in code)@periodic_task
defines a task that is called at some interval (specified bycrontab
in the decorator)@serial_task
defines a task that should only ever have one job running at one time
Best practices
Do not pass objects to celery. Instead, IDs can be passed and the celery task can retrieve the object from the database using the ID. This keeps message lengths short and reduces burden on RabbitMQ as well as preventing tasks from operating on stale data.
Do not specify serializer='pickle'
for new tasks.
This is a deprecated message serializer and by default, we now use JSON.
Queues
Queue |
I/O Bound? |
Target max time-to-start |
Target max time-to-start comments |
Description of usage |
How long does the typical task take to complete? |
Best practices / Notes |
---|---|---|---|---|---|---|
send_report_throttled |
hours |
30 minutes: reports should be sent as close to schedule as possible. EDIT: this queue only affects mvp-* and ews-ghana |
This is used specifically for domains who are abusing Scheduled Reports and overwhelming the background queue. See settings.THROTTLE_SCHED_REPORTS_PATTERNS |
|||
submission_reprocessing_queue |
no? |
hours |
1 hour: not critical if this gets behind as long as it can keep up within a few hours |
Reprocess form submissions that errored in ways that can be handled by HQ. Triggered by ‘submission_reprocessing_queue’ process. |
seconds |
|
sumologic_logs_queue |
yes |
hours |
1 hour: OK for this to get behind |
Forward device logs to sumologic. Triggered by device log submission from mobile. |
seconds |
Non-essential queue |
analytics_queue |
yes |
minutes |
Used to run tasks related to external analytics tools like HubSpot. Triggered by user actions on the site. |
instantaneous (seconds) |
||
reminder_case_update_queue |
minutes |
Run reminder tasks related to case changes. Triggered by case change signal. |
seconds |
|||
reminder_queue |
yes |
minutes |
15 minutes: since these are scheduled it can be important for them to get triggered on time |
Runs the reminder rule tasks for reminders that are due. Triggered by the ‘queue_scheduled_instances’ process. |
seconds |
|
reminder_rule_queue |
minutes |
Run messaging rules after changes to rules. Triggered by changes to rules. |
minutes / hours |
|||
repeat_record_queue |
minutes |
ideally minutes but might be ok if it gets behind during peak |
Run tasks for repeaters. Triggered by repeater queue process. |
seconds |
||
sms_queue |
yes |
minutes |
5 minutes?: depends largely on the messaging. Some messages are more time sensitive than others. We don’t have a way to tell so ideally they should all go out ASAP. |
Used to send SMSs that have been queued. Triggered by ‘run_sms_queue’ process. |
seconds |
|
async_restore_queue |
no |
seconds |
Generate restore response for mobile phones. Gets triggered for sync requests that have async restore flag. |
|||
case_import_queue |
seconds |
Run case imports |
minutes / hours |
|||
email_queue |
yes |
seconds |
generally seconds, since people often blocked on receiving the email (registration workflows for example) |
Send emails. |
seconds |
|
export_download_queue |
seconds |
seconds / minutes |
Used for manually-triggered exports |
minutes |
||
icds_dashboard_reports_queue |
seconds |
fast |
||||
background_queue |
varies wildly |
|||||
beat |
N/A |
|||||
case_rule_queue |
Run case update rules. Triggered by schedule |
minutes / hours |
||||
celery |
||||||
celery_periodic |
Invoice generation: ~2 hours on production. Runs as a single task, once per month. |
I think this is one of the trickiest ones (and most heterogenous) because we run lots of scheduled tasks, that we expect to happen at a certain time, some of which we want at exactly that time and some we are ok with delay in start. |
||||
flower |
N/A |
|||||
icds_aggregation_queue |
yes |
initial task is immediate. follow up tasks are constrained by performance of previous tasks. recommend not tracking |
Run aggregation tasks for ICDS. Triggered by schedule. |
|||
logistics_background_queue |
Custom queue |
|||||
logistics_reminder_queue |
Custom queue |
|||||
saved_exports_queue |
Used only for regularly scheduled exports. Triggered by schedule. |
minutes |
This queue is used only for regularly scheduled exports, which are not user-triggered. The time taken to process a saved export depends on the export itself. We now save the time taken to run the saved export as last_build_duration which can be used to monitor or move the task to a different queue that handles big tasks. Since all exports are triggered at the same time (midnight UTC) the queue gets big. Could be useful to spread these out so that the exports are generated at midnight in the TZ of the domain (see callcenter tasks for where this is already done) |
|||
ucr_indicator_queue |
no |
Used for ICDS very expensive UCRs to aggregate |
||||
ucr_queue |
no |
Used to rebuild UCRs |
minutes to hours |
This is where UCR data source rebuilds occur. Those have an extremely large variation. May be best to split those tasks like “Process 1000 forms/cases, then requeue” so as to not block |
Soil
Soil is a Dimagi utility to provide downloads that are backed by celery.
To use soil:
from soil import DownloadBase
from soil.progress import update_task_state
from soil.util import expose_cached_download
@task
def my_cool_task():
DownloadBase.set_progress(my_cool_task, 0, 100)
# do some stuff
DownloadBase.set_progress(my_cool_task, 50, 100)
# do some more stuff
DownloadBase.set_progress(my_cool_task, 100, 100)
expose_cached_download(payload, expiry, file_extension)
For error handling update the task state to failure and provide errors, HQ currently supports two options:
Option 1
This option raises a celery exception which tells celery to ignore future state updates.
The resulting task result will not be marked as “successful” so task.successful()
will return False
If calling with CELERY_TASKS_ALWAYS_EAGER = True
(i.e. a dev environment), and with .delay()
,
the exception will be caught by celery and task.result
will return the exception.
from celery.exceptions import Ignore
from soil import DownloadBase
from soil.progress import update_task_state
from soil.util import expose_cached_download
@task
def my_cool_task():
try:
# do some stuff
except SomeError as err:
errors = [err]
update_task_state(my_cool_task, states.FAILURE, {'errors': errors})
raise Ignore()
Option 2
This option raises an exception which celery does not catch.
Soil will catch this and set the error to the error message in the exception.
The resulting task will be marked as a failure meaning task.failed()
will return True
If calling with CELERY_TASKS_ALWAYS_EAGER = True
(i.e. a dev environment), the exception will “bubble up” to the calling code.
from soil import DownloadBase
from soil.progress import update_task_state
from soil.util import expose_cached_download
@task
def my_cool_task():
# do some stuff
raise SomeError("my uncool error")
Testing
As noted in the [celery docs](http://docs.celeryproject.org/en/v4.2.1/userguide/testing.html) testing tasks in celery is not the same as in production. In order to test effectively, mocking is required.
An example of mocking with Option 1 from the soil documentation:
@patch('my_cool_test.update_state')
def my_cool_test(update_state):
res = my_cool_task.delay()
self.assertIsInstance(res.result, Ignore)
update_state.assert_called_with(
state=states.FAILURE,
meta={'errors': ['my uncool errors']}
)