Source code for corehq.apps.es.cases

"""
CaseES
------

Here's an example getting pregnancy cases that are either still open or were
closed after May 1st.

.. code-block:: python

    from corehq.apps.es import cases as case_es

    q = (case_es.CaseES()
         .domain('testproject')
         .case_type('pregnancy')
         .OR(case_es.is_closed(False),
             case_es.closed_range(gte=datetime.date(2015, 05, 01))))
"""
from datetime import datetime

from dimagi.utils.parsing import json_format_datetime

from . import aggregations, filters
from .client import ElasticDocumentAdapter, create_document_adapter
from .const import (
    HQ_CASES_INDEX_CANONICAL_NAME,
    HQ_CASES_INDEX_NAME,
    HQ_CASES_SECONDARY_INDEX_NAME,
)
from .es_query import HQESQuery
from .index.settings import IndexSettingsKey


[docs]class CaseES(HQESQuery): index = HQ_CASES_INDEX_CANONICAL_NAME @property def builtin_filters(self): return [ opened_range, closed_range, modified_range, server_modified_range, case_name, is_closed, case_type, owner, owner_type, user, user_ids_handle_unknown, opened_by, case_ids, active_in_range, ] + super(CaseES, self).builtin_filters
[docs]class ElasticCase(ElasticDocumentAdapter): settings_key = IndexSettingsKey.CASES canonical_name = HQ_CASES_INDEX_CANONICAL_NAME @property def mapping(self): from .mappings.case_mapping import CASE_MAPPING return CASE_MAPPING @property def model_cls(self): from corehq.form_processor.models.cases import CommCareCase return CommCareCase def _from_dict(self, case): """ Takes in case dict and applies required transformation to make it suitable for ES. :param case: an instance of ``dict`` which is ``CommcareCase.to_json()`` """ from corehq.pillows.utils import get_user_type if not case.get("owner_id"): if case.get("user_id"): case["owner_id"] = case["user_id"] case['owner_type'] = get_user_type(case.get("owner_id", None)) case['inserted_at'] = json_format_datetime(datetime.utcnow()) if 'backend_id' not in case: case['backend_id'] = 'sql' return super()._from_dict(case)
case_adapter = create_document_adapter( ElasticCase, HQ_CASES_INDEX_NAME, "case", secondary=HQ_CASES_SECONDARY_INDEX_NAME, )
[docs]def opened_range(gt=None, gte=None, lt=None, lte=None): return filters.date_range('opened_on', gt, gte, lt, lte)
[docs]def closed_range(gt=None, gte=None, lt=None, lte=None): return filters.date_range('closed_on', gt, gte, lt, lte)
[docs]def modified_range(gt=None, gte=None, lt=None, lte=None): return filters.date_range('modified_on', gt, gte, lt, lte)
[docs]def server_modified_range(gt=None, gte=None, lt=None, lte=None): return filters.date_range('server_modified_on', gt, gte, lt, lte)
[docs]def case_name(name): return filters.term('name.exact', name)
[docs]def is_closed(closed=True): return filters.term('closed', closed)
[docs]def case_type(type_): return filters.term('type.exact', type_)
[docs]def owner(owner_id): return filters.term('owner_id', owner_id)
[docs]def owner_type(owner_type): return filters.term('owner_type', owner_type)
[docs]def user(user_id): return filters.term('user_id', user_id)
[docs]def opened_by(user_id): return filters.term('opened_by', user_id)
[docs]def case_ids(case_ids): return filters.term('_id', case_ids)
[docs]def active_in_range(gt=None, gte=None, lt=None, lte=None): """Restricts cases returned to those with actions during the range""" return filters.nested( "actions", filters.date_range("actions.date", gt, gte, lt, lte) )
[docs]def user_ids_handle_unknown(user_ids): missing_users = None in user_ids user_ids = [_f for _f in user_ids if _f] if not missing_users: user_filter = user(user_ids) elif user_ids and missing_users: user_filter = filters.OR( user(user_ids), filters.missing('user_id'), ) else: user_filter = filters.missing('user_id') return user_filter
[docs]def touched_total_aggregation(gt=None, gte=None, lt=None, lte=None): return aggregations.FilterAggregation( 'touched_total', filters.AND( modified_range(gt, gte, lt, lte), ) )
[docs]def open_case_aggregation(name='open_case', gt=None, gte=None, lt=None, lte=None): return aggregations.FilterAggregation( name, filters.AND( modified_range(gt, gte, lt, lte), is_closed(False), ) )