Source code for custom.covid.rules.custom_actions

"""
COVID: Available Actions
------------------------

The following actions can be used in messaging in projects using the ``covid`` custom module.
"""
from datetime import datetime

from corehq.apps.domain.models import Domain
from corehq.apps.es.case_search import CaseSearchES
from corehq.apps.es.cases import case_type
from corehq.apps.data_interfaces.models import CaseRuleActionResult
from corehq.apps.hqcase.utils import update_case, AUTO_UPDATE_XMLNS
from corehq.apps.es import filters


[docs]def set_all_activity_complete_date_to_today(case, rule): """ For any case matching the criteria, set the all_activity_complete_date property to today's date, in YYYY-MM-DD format, based on the domain's default time zone. """ if case.get_case_property("all_activity_complete_date"): return CaseRuleActionResult() from dimagi.utils.parsing import ISO_DATE_FORMAT domain_obj = Domain.get_by_name(case.domain) today = datetime.now(domain_obj.get_default_timezone()).strftime(ISO_DATE_FORMAT) (submission, cases) = update_case( case.domain, case.case_id, case_properties={ "all_activity_complete_date": today, }, xmlns=AUTO_UPDATE_XMLNS, device_id=__name__ + ".set_all_activity_complete_date_to_today", form_name=rule.name, ) rule.log_submission(submission.form_id) return CaseRuleActionResult( num_related_updates=1, )
[docs]def close_cases_assigned_to_checkin(checkin_case, rule): """ For any associated checkin case that matches the rule criteria, the following occurs: 1. For all cases of a given type, find all assigned cases. \ An assigned case is a case for which all of the following are true: - Case type patient or contact - Exists in the same domain as the user case - The case property assigned_to_primary_checkin_case_id equals an associated checkin case's case_id 2. For every assigned case, the following case properties are blanked out (set to ""): - assigned_to_primary_checkin_case_id - is_assigned_primary - assigned_to_primary_name - assigned_to_primary_username """ if checkin_case.type != "checkin": return CaseRuleActionResult() blank_properties = { "assigned_to_primary_checkin_case_id": "", "is_assigned_primary": "", "assigned_to_primary_name": "", "assigned_to_primary_username": "", } num_related_updates = 0 for assigned_case_domain, assigned_case_id in _get_assigned_cases(checkin_case): num_related_updates += 1 (submission, cases) = update_case( assigned_case_domain, assigned_case_id, case_properties=blank_properties, xmlns=AUTO_UPDATE_XMLNS, device_id=__name__ + ".close_cases_assigned_to_checkin", form_name=rule.name, ) rule.log_submission(submission.form_id) (close_checkin_submission, cases) = update_case( checkin_case.domain, checkin_case.case_id, close=True, xmlns=AUTO_UPDATE_XMLNS, device_id=__name__ + ".close_cases_assigned_to_checkin", form_name=rule.name, ) rule.log_submission(close_checkin_submission.form_id) return CaseRuleActionResult( num_closes=1, num_related_updates=num_related_updates, )
def _get_assigned_cases(checkin_case): return ( CaseSearchES() .domain(checkin_case.domain) .filter(filters.OR(case_type("patient"), case_type("contact"))) .case_property_query("assigned_to_primary_checkin_case_id", checkin_case.case_id) .values_list('domain', '_id') )