Source code for corehq.pillows.cacheinvalidate

import logging

from dimagi.utils.couch.cache import cache_core
from dimagi.utils.couch.cache.cache_core import GenerationCache
from pillowtop.checkpoints.manager import PillowCheckpoint
from pillowtop.feed.couch import CouchChangeFeed
from pillowtop.models import DjangoPillowCheckpoint
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors import PillowProcessor

from corehq.util.couchdb_management import couch_config

pillow_logging = logging.getLogger("pillowtop")


class FakeCheckpoint(PillowCheckpoint):
    def __init__(self, checkpoint_id, couch_db):
        super(FakeCheckpoint, self).__init__("{}_{}".format(checkpoint_id, couch_db.dbname), 'text')
        self.couch_db = couch_db

    def get_or_create_wrapped(self, verify_unchanged=False):
        return DjangoPillowCheckpoint(
            checkpoint_id=self.checkpoint_id,
            sequence=self.couch_db.info()['update_seq'],
        )

    def get_current_sequence_id(self):
        return self.couch_db.info()['update_seq']

    def update_to(self, seq):
        pass

    def reset(self):
        pass

    def touch(self, min_interval):
        pass


[docs]class CacheInvalidateProcessor(PillowProcessor): """Invalidates cached CouchDB documents Reads from: - CouchDB Writes to: - Redis """ def __init__(self): self.gen_caches = set(GenerationCache.doc_type_generation_map().values()) def get_generations(self): return ["%s :: %s" % (gc.generation_key, gc._get_generation()) for gc in self.gen_caches] def process_change(self, change): self.process_doc(change.get_document(), change.deleted) def process_doc(self, doc, is_deleted): """ This function does actual cache invalidation. It's also called manually by directly invalidated things. """ doc_id = doc['_id'] if doc_id.startswith('pillowtop_corehq.pillows'): return None # send document to cache invalidation workflow generations_prior = set(self.get_generations()) cache_core.invalidate_doc(doc, deleted=is_deleted) generations_after = set(self.get_generations()) generation_change = generations_prior.symmetric_difference(generations_after) if len(generation_change) > 0: pillow_logging.debug("[CacheInvalidate]: Change %s (%s), generation change: %s" % ( doc_id, doc.get('doc_type', 'unknown'), ', '.join(generation_change)) ) else: pillow_logging.debug( "[CacheInvalidate]: Change %s (%s), no generation change" % ( doc_id, doc.get('doc_type', 'unknown') ) )
def get_main_cache_invalidation_pillow(pillow_id, **kwargs): main_db = couch_config.get_db(None) return _get_cache_invalidation_pillow(pillow_id, main_db, couch_filter="hqadmin/not_case_form") def get_user_groups_cache_invalidation_pillow(pillow_id, **kwargs): from corehq.apps.users.models import CommCareUser return _get_cache_invalidation_pillow(pillow_id, CommCareUser.get_db())
[docs]def _get_cache_invalidation_pillow(pillow_id, couch_db, couch_filter=None): """Pillow that listens to changes and invalidates the cache whether it's a single doc being cached or a view. Processors: - :py:class:`corehq.pillows.cache_invalidate_pillow.CacheInvalidateProcessor` """ checkpoint = FakeCheckpoint( 'cache_invalidate_pillow', couch_db ) change_feed = CouchChangeFeed(couch_db, couch_filter=couch_filter) return ConstructedPillow( name=pillow_id, checkpoint=checkpoint, change_feed=change_feed, processor=CacheInvalidateProcessor(), )