Source code for corehq.pillows.user

import copy
from corehq.apps.change_feed.consumer.feed import KafkaChangeFeed, KafkaCheckpointEventHandler
from corehq.apps.change_feed import topics
from corehq.apps.es.users import user_adapter
from corehq.apps.groups.dbaccessors import get_group_id_name_map_by_user
from corehq.apps.users.models import CommCareUser, CouchUser, WebUser
from corehq.apps.users.util import WEIRD_USER_IDS
from corehq.apps.userreports.data_source_providers import DynamicDataSourceProvider, StaticDataSourceProvider
from corehq.apps.userreports.pillow import get_ucr_processor
from corehq.util.quickcache import quickcache
from corehq.util.doc_processor.couch import CouchDocumentProvider
from pillowtop.checkpoints.manager import get_checkpoint_for_elasticsearch_pillow
from pillowtop.const import DEFAULT_PROCESSOR_CHUNK_SIZE
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors import ElasticProcessor, PillowProcessor
from pillowtop.processors.elastic import BulkElasticProcessor
from pillowtop.reindexer.reindexer import ResumableBulkElasticPillowReindexer
from pillowtop.reindexer.reindexer import ReindexerFactory


def update_unknown_user_from_form_if_necessary(doc_dict):
    if doc_dict is None:
        return

    user_id, username, domain, xform_id = _get_user_fields_from_form_doc(doc_dict)

    if (not user_id
            or user_id in WEIRD_USER_IDS
            or _user_exists_in_couch(user_id)):
        return

    if not user_adapter.exists(user_id):
        doc_type = "AdminUser" if username == "admin" else "UnknownUser"
        doc = {
            "_id": user_id,
            "domain": domain,
            "username": username,
            "first_form_found_in": xform_id,
            "doc_type": doc_type,
        }
        if domain:
            doc["domain_membership"] = {"domain": domain}
        user_adapter.index(doc)


@quickcache(['user_id'])
def _user_exists_in_couch(user_id):
    return CouchUser.get_db().doc_exist(user_id)


def _get_user_fields_from_form_doc(form_doc):
    form_meta = form_doc.get('form', {}).get('meta', {})
    domain = form_doc.get('domain')
    user_id = form_meta.get('userID')
    username = form_meta.get('username')
    xform_id = form_doc.get('_id')
    return user_id, username, domain, xform_id


[docs]class UnknownUsersProcessor(PillowProcessor): """Monitors forms for user_ids we don't know about and creates an entry in ES for the user. Reads from: - Kafka topics: form-sql, form - XForm data source Writes to: - UserES index """ def process_change(self, change): update_unknown_user_from_form_if_necessary(change.get_document())
def add_demo_user_to_user_index(): user_adapter.index( {"_id": "demo_user", "username": "demo_user", "doc_type": "DemoUser"} ) def get_user_es_processor(): return BulkElasticProcessor(user_adapter)
[docs]def get_user_pillow_old(pillow_id='UserPillow', num_processes=1, process_num=0, **kwargs): """Processes users and sends them to ES. Processors: - :py:func:`pillowtop.processors.elastic.ElasticProcessor` """ # todo; To remove after full rollout of https://github.com/dimagi/commcare-hq/pull/21329/ assert pillow_id == 'UserPillow', 'Pillow ID is not allowed to change' checkpoint = get_checkpoint_for_elasticsearch_pillow(pillow_id, user_adapter.index_name, topics.USER_TOPICS) user_processor = ElasticProcessor(user_adapter) change_feed = KafkaChangeFeed( topics=topics.USER_TOPICS, client_id='users-to-es', num_processes=num_processes, process_num=process_num ) return ConstructedPillow( name=pillow_id, checkpoint=checkpoint, change_feed=change_feed, processor=user_processor, change_processed_event_handler=KafkaCheckpointEventHandler( checkpoint=checkpoint, checkpoint_frequency=100, change_feed=change_feed ), )
[docs]def get_user_pillow(pillow_id='user-pillow', num_processes=1, dedicated_migration_process=False, process_num=0, skip_ucr=False, processor_chunk_size=DEFAULT_PROCESSOR_CHUNK_SIZE, **kwargs): """Processes users and sends them to ES and UCRs. Processors: - :py:func:`pillowtop.processors.elastic.BulkElasticProcessor` - :py:func:`corehq.apps.userreports.pillow.ConfigurableReportPillowProcessor` """ # Pillow that sends users to ES and UCR assert pillow_id == 'user-pillow', 'Pillow ID is not allowed to change' checkpoint = get_checkpoint_for_elasticsearch_pillow(pillow_id, user_adapter.index_name, topics.USER_TOPICS) user_processor = get_user_es_processor() ucr_processor = get_ucr_processor( data_source_providers=[ DynamicDataSourceProvider('CommCareUser'), StaticDataSourceProvider('CommCareUser') ], run_migrations=(process_num == 0), # only first process runs migrations, ) change_feed = KafkaChangeFeed( topics=topics.USER_TOPICS, client_id='users-to-es', num_processes=num_processes, process_num=process_num, dedicated_migration_process=dedicated_migration_process ) return ConstructedPillow( name=pillow_id, checkpoint=checkpoint, change_feed=change_feed, processor=[user_processor] if skip_ucr else [ucr_processor, user_processor], change_processed_event_handler=KafkaCheckpointEventHandler( checkpoint=checkpoint, checkpoint_frequency=100, change_feed=change_feed ), processor_chunk_size=processor_chunk_size, process_num=process_num, is_dedicated_migration_process=dedicated_migration_process and (process_num == 0) )
[docs]def get_unknown_users_pillow(pillow_id='unknown-users-pillow', num_processes=1, process_num=0, **kwargs): """This pillow adds users from xform submissions that come in to the User Index if they don't exist in HQ Processors: - :py:class:`corehq.pillows.user.UnknownUsersProcessor` """ # todo; To remove after full rollout of https://github.com/dimagi/commcare-hq/pull/21329/ checkpoint = get_checkpoint_for_elasticsearch_pillow(pillow_id, user_adapter.index_name, topics.FORM_TOPICS) processor = UnknownUsersProcessor() change_feed = KafkaChangeFeed( topics=topics.FORM_TOPICS, client_id='unknown-users', num_processes=num_processes, process_num=process_num ) return ConstructedPillow( name=pillow_id, checkpoint=checkpoint, change_feed=change_feed, processor=processor, change_processed_event_handler=KafkaCheckpointEventHandler( checkpoint=checkpoint, checkpoint_frequency=100, change_feed=change_feed ), )
class UserReindexerFactory(ReindexerFactory): slug = 'user' arg_contributors = [ ReindexerFactory.resumable_reindexer_args, ReindexerFactory.elastic_reindexer_args ] def build(self): iteration_key = "UserToElasticsearchPillow_{}_reindexer".format(user_adapter.index_name) doc_provider = CouchDocumentProvider(iteration_key, [CommCareUser, WebUser]) options = { 'chunk_size': 5 } options.update(self.options) return ResumableBulkElasticPillowReindexer( doc_provider, user_adapter, pillow=get_user_pillow_old(), **options )