Source code for corehq.pillows.synclog

from django.conf import settings
from django.db import DEFAULT_DB_ALIAS

from casexml.apps.phone.models import SyncLogSQL
from corehq.sql_db.util import handle_connection_failure
from dimagi.utils.parsing import string_to_utc_datetime
from pillowtop.checkpoints.manager import KafkaPillowCheckpoint
from pillowtop.feed.interface import Change
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors.interface import PillowProcessor
from pillowtop.reindexer.reindexer import Reindexer, ReindexerFactory

from corehq.apps.change_feed import topics
from corehq.apps.change_feed.consumer.feed import (
    KafkaChangeFeed,
    KafkaCheckpointEventHandler,
)
from corehq.apps.receiverwrapper.util import get_version_from_build_id
from corehq.apps.users.models import (
    CommCareUser,
    CouchUser,
    DeviceAppMeta,
    UserReportingMetadataStaging,
    WebUser,
)
from corehq.apps.users.util import (
    update_device_meta,
    update_last_sync,
    update_latest_builds,
)
from corehq.util.doc_processor.couch import CouchDocumentProvider
from corehq.util.doc_processor.interface import (
    BaseDocProcessor,
    DocumentProcessorController,
)
from settings import SYNCLOGS_SQL_DB_ALIAS

SYNCLOG_SQL_USER_SYNC_GROUP_ID = "synclog_sql_user_sync"


def _synclog_pillow_dbs():
    return {SYNCLOGS_SQL_DB_ALIAS, DEFAULT_DB_ALIAS}


[docs]def get_user_sync_history_pillow( pillow_id='UpdateUserSyncHistoryPillow', num_processes=1, process_num=0, **kwargs): """Synclog pillow Processors: - :py:func:`corehq.pillows.synclog.UserSyncHistoryProcessor` """ change_feed = KafkaChangeFeed( topics=[topics.SYNCLOG_SQL], client_id=SYNCLOG_SQL_USER_SYNC_GROUP_ID, num_processes=num_processes, process_num=process_num) checkpoint = KafkaPillowCheckpoint(pillow_id, [topics.SYNCLOG_SQL]) return ConstructedPillow( name=pillow_id, checkpoint=checkpoint, change_feed=change_feed, processor=UserSyncHistoryProcessor(), change_processed_event_handler=KafkaCheckpointEventHandler( checkpoint=checkpoint, checkpoint_frequency=100, change_feed=change_feed ), )
[docs]class UserSyncHistoryProcessor(PillowProcessor): """Updates the user document with reporting metadata when a user syncs Note when USER_REPORTING_METADATA_BATCH_ENABLED is True that this is written to a postgres table. Entries in that table are then batched and processed separately. Reads from: - CouchDB (user) - SynclogSQL table Writes to: - CouchDB (user) (when batch processing disabled) (default) - UserReportingMetadataStaging (SQL) (when batch processing enabled) """ @handle_connection_failure(get_db_aliases=_synclog_pillow_dbs) def process_change(self, change): synclog = change.get_document() if not synclog: return user_id = synclog.get('user_id') domain = synclog.get('domain') if not user_id or not domain: return try: sync_date = string_to_utc_datetime(synclog.get('date')) except (ValueError, AttributeError): return build_id = synclog.get('build_id') device_id = synclog.get('device_id') app_id = synclog.get('app_id') # WebApps syncs do not provide the app_id. # For those syncs we go ahead and mark the last synclog synchronously. if app_id and settings.USER_REPORTING_METADATA_BATCH_ENABLED: UserReportingMetadataStaging.add_sync(domain, user_id, app_id, build_id, sync_date, device_id) else: user = CouchUser.get_by_user_id(user_id) if not user: return device_app_meta = None if device_id and app_id: device_app_meta = DeviceAppMeta(app_id=app_id, build_id=build_id, last_sync=sync_date) mark_last_synclog(domain, user, app_id, build_id, sync_date, sync_date, device_id, device_app_meta)
def mark_last_synclog(domain, user, app_id, build_id, sync_date, latest_build_date, device_id, device_app_meta, commcare_version=None, build_profile_id=None, fcm_token=None, fcm_token_timestamp=None, save_user=True): version = None if build_id: version = get_version_from_build_id(domain, build_id) local_save = False if sync_date: # sync_date could be null if this is called from a heartbeat request local_save |= update_last_sync(user, app_id, sync_date, version) if version: local_save |= update_latest_builds(user, app_id, latest_build_date, version, build_profile_id=build_profile_id) if device_id: local_save |= update_device_meta(user, device_id, commcare_version=commcare_version, device_app_meta=device_app_meta, fcm_token=fcm_token, fcm_token_timestamp=fcm_token_timestamp, save=False) if local_save and save_user: user.save(fire_signals=False) return local_save class UserSyncHistoryReindexerDocProcessor(BaseDocProcessor): def __init__(self, pillow_processor): self.pillow_processor = pillow_processor def process_doc(self, doc): synclog_changes = self._doc_to_changes(doc) for change in synclog_changes: try: self.pillow_processor.process_change(change) except Exception: return False return True def handle_skip(self, doc): print('Unable to process user {}'.format( doc['_id'], )) return True def _doc_to_changes(self, doc): # creates a change object for the last 10 synclogs # of the given user, for the synclog pillow to process. # this means we wont have to iterate through all synclogs # when reindexing. synclogs = SyncLogSQL.objects.filter(user_id=doc['_id']).order_by('date')[:10] changes = [Change( id=res.doc['_id'], sequence_id=None, document=res.doc ) for res in synclogs] return changes class UserSyncHistoryReindexer(Reindexer): def __init__(self, doc_provider, chunk_size=1000, reset=False): self.reset = reset self.doc_provider = doc_provider self.chunk_size = chunk_size self.doc_processor = UserSyncHistoryReindexerDocProcessor(UserSyncHistoryProcessor()) def reindex(self): processor = DocumentProcessorController( self.doc_provider, self.doc_processor, reset=self.reset, chunk_size=self.chunk_size, ) processor.run() class UpdateUserSyncHistoryReindexerFactory(ReindexerFactory): slug = 'user-sync-history' arg_contributors = [ ReindexerFactory.resumable_reindexer_args, ] def build(self): iteration_key = "UpdateUserSyncHistoryPillow_reindexer" doc_provider = CouchDocumentProvider(iteration_key, doc_type_tuples=[ CommCareUser, WebUser ]) return UserSyncHistoryReindexer(doc_provider, **self.options)