Source code for corehq.pillows.ledger

from collections import namedtuple
from functools import lru_cache

from pillowtop.checkpoints.manager import (
    get_checkpoint_for_elasticsearch_pillow,
)
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors import PillowProcessor

from corehq.apps.change_feed import topics
from corehq.apps.change_feed.consumer.feed import (
    KafkaChangeFeed,
    KafkaCheckpointEventHandler,
)
from corehq.apps.export.models.new import LedgerSectionEntry
from corehq.apps.locations.models import SQLLocation
from corehq.util.quickcache import quickcache


@quickcache(['case_id'])
def _location_id_for_case(case_id):
    try:
        return SQLLocation.objects.get(supply_point_id=case_id).location_id
    except SQLLocation.DoesNotExist:
        return None


def _get_daily_consumption_for_ledger(ledger):
    from corehq.apps.commtrack.consumption import get_consumption_for_ledger_json
    daily_consumption = get_consumption_for_ledger_json(ledger)
    from corehq.form_processor.backends.sql.dbaccessors import LedgerAccessorSQL
    ledger_value = LedgerAccessorSQL.get_ledger_value(
        ledger['case_id'], ledger['section_id'], ledger['entry_id']
    )
    ledger_value.daily_consumption = daily_consumption
    LedgerAccessorSQL.save_ledger_values([ledger_value])
    return daily_consumption


def _update_ledger_section_entry_combinations(ledger):
    current_combos = _get_ledger_section_combinations(ledger['domain'])
    if (ledger['section_id'], ledger['entry_id']) in current_combos:
        return

    # use get_or_create because this may be created by another parallel process
    LedgerSectionEntry.objects.get_or_create(
        domain=ledger['domain'],
        section_id=ledger['section_id'],
        entry_id=ledger['entry_id'],
    )

    # clear the lru_cache so that next time a ledger is saved, we get the combinations
    _get_ledger_section_combinations.cache_clear()


@lru_cache()
def _get_ledger_section_combinations(domain):
    return list(LedgerSectionEntry.objects.filter(domain=domain).values_list('section_id', 'entry_id').all())


[docs]class LedgerProcessor(PillowProcessor): """Updates ledger section and entry combinations (exports), daily consumption and case location ids Reads from: - Kafka topics: ledger - Ledger data source Writes to: - LedgerSectionEntry postgres table - Ledger data source """ def process_change(self, change): if change.deleted: return ledger = change.get_document() from corehq.apps.commtrack.models import CommtrackConfig commtrack_config = CommtrackConfig.for_domain(ledger['domain']) if commtrack_config and commtrack_config.use_auto_consumption: daily_consumption = _get_daily_consumption_for_ledger(ledger) ledger['daily_consumption'] = daily_consumption if not ledger.get('location_id') and ledger.get('case_id'): ledger['location_id'] = _location_id_for_case(ledger['case_id']) _update_ledger_section_entry_combinations(ledger)
[docs]def get_ledger_to_elasticsearch_pillow(pillow_id='LedgerToElasticsearchPillow', num_processes=1, process_num=0, **kwargs): """Ledger pillow Note that this pillow's id references Elasticsearch, but it no longer saves to ES. It has been kept to keep the checkpoint consistent, and can be changed at any time. Processors: - :py:class:`corehq.pillows.ledger.LedgerProcessor` """ assert pillow_id == 'LedgerToElasticsearchPillow', 'Pillow ID is not allowed to change' index_name = "ledgers_2016-03-15" checkpoint = get_checkpoint_for_elasticsearch_pillow( pillow_id, index_name, [topics.LEDGER] ) change_feed = KafkaChangeFeed( topics=[topics.LEDGER], client_id='ledgers-to-es', num_processes=num_processes, process_num=process_num ) return ConstructedPillow( name=pillow_id, checkpoint=checkpoint, change_feed=change_feed, processor=LedgerProcessor(), change_processed_event_handler=KafkaCheckpointEventHandler( checkpoint=checkpoint, checkpoint_frequency=100, change_feed=change_feed ), )