Source code for corehq.pillows.sms

from pillowtop.checkpoints.manager import (
    get_checkpoint_for_elasticsearch_pillow,
)
from pillowtop.const import DEFAULT_PROCESSOR_CHUNK_SIZE
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors.elastic import BulkElasticProcessor
from pillowtop.reindexer.reindexer import (
    ReindexerFactory,
    ResumableBulkElasticPillowReindexer,
)

from corehq.apps.change_feed import topics
from corehq.apps.change_feed.consumer.feed import (
    KafkaChangeFeed,
    KafkaCheckpointEventHandler,
)
from corehq.apps.sms.models import SMS
from corehq.elastic import get_es_new
from corehq.form_processor.backends.sql.dbaccessors import ReindexAccessor
from corehq.pillows.mappings.sms_mapping import SMS_INDEX_INFO
from corehq.util.doc_processor.sql import SqlDocumentProvider


[docs]def get_sql_sms_pillow(pillow_id='SqlSMSPillow', num_processes=1, process_num=0, processor_chunk_size=DEFAULT_PROCESSOR_CHUNK_SIZE, **kwargs): """SMS Pillow Processors: - :py:class:`pillowtop.processors.elastic.BulkElasticProcessor` """ assert pillow_id == 'SqlSMSPillow', 'Pillow ID is not allowed to change' checkpoint = get_checkpoint_for_elasticsearch_pillow(pillow_id, SMS_INDEX_INFO, [topics.SMS]) processor = BulkElasticProcessor( elasticsearch=get_es_new(), index_info=SMS_INDEX_INFO, doc_prep_fn=lambda x: x ) change_feed = KafkaChangeFeed( topics=[topics.SMS], client_id='sql-sms-to-es', num_processes=num_processes, process_num=process_num ) return ConstructedPillow( name=pillow_id, checkpoint=checkpoint, change_feed=change_feed, processor=processor, change_processed_event_handler=KafkaCheckpointEventHandler( checkpoint=checkpoint, checkpoint_frequency=100, change_feed=change_feed ), processor_chunk_size=processor_chunk_size )
class SMSReindexAccessor(ReindexAccessor): @property def model_class(self): return SMS @property def id_field(self): return 'id' def get_doc(self, doc_id): return SMS.objects.get(pk=doc_id) class SmsReindexerFactory(ReindexerFactory): slug = 'sms' arg_contributors = [ ReindexerFactory.resumable_reindexer_args, ReindexerFactory.elastic_reindexer_args, ] def build(self): iteration_key = f"SmsToElasticsearchPillow_{SMS_INDEX_INFO.index}_reindexer" reindex_accessor = SMSReindexAccessor() doc_provider = SqlDocumentProvider(iteration_key, reindex_accessor) return ResumableBulkElasticPillowReindexer( doc_provider, elasticsearch=get_es_new(), index_info=SMS_INDEX_INFO, pillow=get_sql_sms_pillow(), **self.options )