Source code for corehq.apps.es.users

"""
UserES
------

Here's an example adapted from the case list report - it gets a list of the ids
of all unknown users, web users, and demo users on a domain.

.. code-block:: python

    from corehq.apps.es import users as user_es

    user_filters = [
        user_es.unknown_users(),
        user_es.web_users(),
        user_es.demo_users(),
    ]

    query = (user_es.UserES()
             .domain(self.domain)
             .OR(*user_filters)
             .show_inactive())

    owner_ids = query.get_ids()
"""

from . import filters, queries
from .client import ElasticDocumentAdapter, create_document_adapter
from .const import (
    HQ_USERS_INDEX_CANONICAL_NAME,
    HQ_USERS_INDEX_NAME,
    HQ_USERS_SECONDARY_INDEX_NAME,
)
from .es_query import HQESQuery
from .index.settings import IndexSettingsKey


[docs]class UserES(HQESQuery): index = HQ_USERS_INDEX_CANONICAL_NAME default_filters = { 'not_deleted': filters.term("base_doc", "couchuser"), 'active': filters.term("is_active", True), } @property def builtin_filters(self): return [ domain, domains, created, mobile_users, web_users, user_ids, location, login_as_user, last_logged_in, analytics_enabled, is_practice_user, role_id, is_active, username, missing_or_empty_user_data_property, ] + super(UserES, self).builtin_filters
[docs] def show_inactive(self): """Include inactive users, which would normally be filtered out.""" return self.remove_default_filter('active')
[docs] def show_only_inactive(self): query = self.remove_default_filter('active') return query.is_active(False)
[docs]class ElasticUser(ElasticDocumentAdapter): settings_key = IndexSettingsKey.USERS canonical_name = HQ_USERS_INDEX_CANONICAL_NAME @property def model_cls(self): from corehq.apps.users.models import CouchUser return CouchUser @property def mapping(self): from .mappings.user_mapping import USER_MAPPING return USER_MAPPING def _from_dict(self, user_dict): """ Takes a user dict and applies required transfomation to make it suitable for ES. :param user: an instance ``dict`` which is result of ``CouchUser.to_json()`` """ from corehq.apps.groups.dbaccessors import ( get_group_id_name_map_by_user, ) if user_dict['doc_type'] == 'CommCareUser' and '@' in user_dict['username']: user_dict['base_username'] = user_dict['username'].split("@")[0] else: user_dict['base_username'] = user_dict['username'] results = get_group_id_name_map_by_user(user_dict['_id']) user_dict['__group_ids'] = [res.id for res in results] user_dict['__group_names'] = [res.name for res in results] user_dict['user_data_es'] = [] user_dict.pop('password', None) if user_dict.get('base_doc') == 'CouchUser' and user_dict['doc_type'] == 'CommCareUser': user_obj = self.model_cls.wrap_correctly(user_dict) user_data = user_obj.get_user_data(user_obj.domain) for key, value in user_data.items(): user_dict['user_data_es'].append({ 'key': key, 'value': value, }) return super()._from_dict(user_dict)
user_adapter = create_document_adapter( ElasticUser, HQ_USERS_INDEX_NAME, "user", secondary=HQ_USERS_SECONDARY_INDEX_NAME, )
[docs]def domain(domain, allow_enterprise=False): domain_list = [domain] if allow_enterprise: from corehq.apps.enterprise.models import EnterprisePermissions source_domain = EnterprisePermissions.get_source_domain(domain) if source_domain: domain_list.append(source_domain) return domains(domain_list)
[docs]def domains(domains): return filters.OR( filters.term("domain.exact", domains), filters.term("domain_memberships.domain.exact", domains) )
[docs]def analytics_enabled(enabled=True): if enabled: return filters.OR( filters.term("analytics_enabled", True), filters.missing("analytics_enabled") ) else: return filters.term("analytics_enabled", False)
[docs]def username(username): return filters.term("username.exact", username)
[docs]def web_users(): return filters.doc_type("WebUser")
[docs]def mobile_users(): return filters.doc_type("CommCareUser")
[docs]def unknown_users(): """ Return only UnknownUsers. Unknown users are mock users created from xform submissions with unknown user ids. """ return filters.doc_type("UnknownUser")
[docs]def admin_users(): """ Return only AdminUsers. Admin users are mock users created from xform submissions with unknown user ids whose username is "admin". """ return filters.doc_type("AdminUser")
[docs]def demo_users(): """Matches users whose username is demo_user""" return username("demo_user")
[docs]def created(gt=None, gte=None, lt=None, lte=None): return filters.date_range('created_on', gt, gte, lt, lte)
[docs]def last_logged_in(gt=None, gte=None, lt=None, lte=None): return filters.date_range('last_login', gt, gte, lt, lte)
[docs]def user_ids(user_ids): return filters.term("_id", list(user_ids))
[docs]def location(location_id): # by any assigned-location primary or not return filters.OR( filters.AND(mobile_users(), filters.term('assigned_location_ids', location_id)), filters.AND( web_users(), filters.term('domain_memberships.assigned_location_ids', location_id) ), )
[docs]def is_practice_user(practice_mode=True): return filters.term('is_demo_user', practice_mode)
[docs]def role_id(role_id): return filters.OR( filters.term("domain_membership.role_id", role_id), # mobile users filters.term("domain_memberships.role_id", role_id) # web users )
[docs]def is_active(active=True): return filters.term("is_active", active)
def _user_data(key, filter_): # Note: user data does not exist in ES for web users return queries.nested( 'user_data_es', filters.AND( filters.term(field='user_data_es.key', value=key), filter_ ) )
[docs]def query_user_data(key, value): return _user_data(key, queries.match(field='user_data_es.value', search_string=value))
[docs]def login_as_user(value): return _user_data('login_as_user', filters.term('user_data_es.value', value))
[docs]def missing_or_empty_user_data_property(property_name): """ A user_data property doesn't exist, or does exist but has an empty string value. """ missing_property = filters.NOT(queries.nested( 'user_data_es', filters.term(field='user_data_es.key', value=property_name), )) empty_value = _user_data(property_name, filters.term('user_data_es.value', '')) return filters.OR(missing_property, empty_value)