Source code for corehq.apps.change_feed.pillow

from django.db import IntegrityError
from pillowtop.checkpoints.manager import (
    PillowCheckpoint,
    PillowCheckpointEventHandler,
)
from pillowtop.feed.couch import CouchChangeFeed, populate_change_metadata
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors import PillowProcessor

from corehq.apps.change_feed import data_sources, topics
from corehq.apps.change_feed.producer import ChangeProducer
from corehq.apps.change_feed.topics import get_topic_for_doc_type
from corehq.apps.cleanup.models import DeletedCouchDoc
from corehq.apps.domain.models import Domain
from corehq.apps.users.models import CommCareUser
from corehq.util.couchdb_management import couch_config


[docs]class KafkaProcessor(PillowProcessor): """Generic processor for CouchDB changes to put those changes in a kafka topic Reads from: - CouchDB change feed Writes to: - Specified kafka topic - DeletedCouchDoc SQL table """ def __init__(self, data_source_type, data_source_name, default_topic): self._producer = ChangeProducer() self._data_source_type = data_source_type self._data_source_name = data_source_name self._default_topic = default_topic def process_change(self, change): populate_change_metadata(change, self._data_source_type, self._data_source_name) if change.metadata: doc_type = _get_doc_type_from_change(change) # The default topic is used in case a doc_type cannot be found (e.g., a hard deletion might result # in a missing doc_type) topic = get_topic_for_doc_type(doc_type, self._data_source_type, self._default_topic) self._producer.send_change(topic, change.metadata) # soft deletion if change.metadata.is_deletion and doc_type is not None: deleted_on = change.metadata.original_publication_datetime _create_deleted_couch_doc(change.id, doc_type, deleted_on)
def get_default_couch_db_change_feed_pillow(pillow_id, **kwargs): return get_change_feed_pillow_for_db(pillow_id, couch_config.get_db(None)) def get_user_groups_db_kafka_pillow(pillow_id, **kwargs): return get_change_feed_pillow_for_db( pillow_id, couch_config.get_db_for_class(CommCareUser), topics.COMMCARE_USER ) def get_domain_db_kafka_pillow(pillow_id, **kwargs): return get_change_feed_pillow_for_db(pillow_id, couch_config.get_db_for_class(Domain), topics.DOMAIN) def get_application_db_kafka_pillow(pillow_id, **kwargs): from corehq.apps.app_manager.models import Application return get_change_feed_pillow_for_db(pillow_id, couch_config.get_db_for_class(Application), topics.APP)
[docs]def get_change_feed_pillow_for_db(pillow_id, couch_db, default_topic=None): """Generic pillow for inserting Couch documents into Kafka. Reads from: - CouchDB Writes to: - Kafka """ processor = KafkaProcessor( data_source_type=data_sources.SOURCE_COUCH, data_source_name=couch_db.dbname, default_topic=default_topic, ) change_feed = CouchChangeFeed(couch_db) checkpoint = PillowCheckpoint(pillow_id, change_feed.sequence_format) return ConstructedPillow( name=pillow_id, checkpoint=checkpoint, change_feed=change_feed, processor=processor, change_processed_event_handler=PillowCheckpointEventHandler( checkpoint=checkpoint, checkpoint_frequency=100, ), )
def _get_doc_type_from_change(change): """ According to past comments, couch change feeds do not consistently include the 'doc' with the published change which makes it hard to determine the doc_type. Try the metadata first, then the 'doc', otherwise we are out of luck """ if change.metadata.document_type: return change.metadata.document_type try: return change['doc']['doc_type'] except KeyError: return None def _create_deleted_couch_doc(doc_id, doc_type, deleted_on): try: DeletedCouchDoc.objects.create(doc_id=doc_id, doc_type=doc_type, deleted_on=deleted_on) except IntegrityError: # if it already exists, ignore it pass