Source code for corehq.pillows.xform
import collections
from dateutil import parser
from django.conf import settings
from corehq.apps.change_feed.topics import FORM_TOPICS
from corehq.apps.change_feed.consumer.feed import KafkaChangeFeed, KafkaCheckpointEventHandler
from corehq.apps.data_interfaces.pillow import CaseDeduplicationProcessor
from corehq.apps.userreports.data_source_providers import DynamicDataSourceProvider, StaticDataSourceProvider
from corehq.apps.userreports.pillow import get_ucr_processor
from corehq.apps.es.forms import form_adapter
from corehq.apps.es.users import user_adapter
from corehq.form_processor.backends.sql.dbaccessors import FormReindexAccessor
from corehq.pillows.base import is_couch_change_for_sql_domain
from corehq.pillows.user import UnknownUsersProcessor
from corehq.util.doc_processor.sql import SqlDocumentProvider
from couchforms.const import RESERVED_WORDS, DEVICE_LOG_XMLNS
from pillowtop.checkpoints.manager import KafkaPillowCheckpoint, get_checkpoint_for_elasticsearch_pillow
from pillowtop.const import DEFAULT_PROCESSOR_CHUNK_SIZE
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors.form import FormSubmissionMetadataTrackerProcessor
from pillowtop.processors.elastic import BulkElasticProcessor, ElasticProcessor
from pillowtop.reindexer.reindexer import ResumableBulkElasticPillowReindexer, ReindexerFactory
def is_valid_date(txt):
try:
if txt and parser.parse(txt):
return True
except Exception:
pass
return False
# modified from: http://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', delimiter='/'):
items = []
for k, v in d.items():
if k in RESERVED_WORDS:
continue
new_key = parent_key + delimiter + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(list(flatten(v, new_key, delimiter).items()))
elif not isinstance(v, list):
items.append((new_key, v))
return dict(items)
def xform_pillow_filter(doc_dict):
"""
:return: True to filter out doc
"""
return (
doc_dict.get('xmlns', None) == DEVICE_LOG_XMLNS
or doc_dict.get('domain', None) is None
or doc_dict['form'] is None
)
[docs]def get_xform_to_elasticsearch_pillow(pillow_id='XFormToElasticsearchPillow', num_processes=1,
process_num=0, **kwargs):
"""XForm change processor that sends form data to Elasticsearch
Processors:
- :py:class:`pillowtop.processors.elastic.ElasticProcessor`
"""
checkpoint = get_checkpoint_for_elasticsearch_pillow(pillow_id, form_adapter.index_name, FORM_TOPICS)
form_processor = ElasticProcessor(
form_adapter,
doc_filter_fn=xform_pillow_filter,
change_filter_fn=is_couch_change_for_sql_domain
)
kafka_change_feed = KafkaChangeFeed(
topics=FORM_TOPICS, client_id='forms-to-es', num_processes=num_processes, process_num=process_num
)
return ConstructedPillow(
name=pillow_id,
checkpoint=checkpoint,
change_feed=kafka_change_feed,
processor=form_processor,
change_processed_event_handler=KafkaCheckpointEventHandler(
checkpoint=checkpoint, checkpoint_frequency=100, change_feed=kafka_change_feed
),
)
[docs]def get_xform_pillow(pillow_id='xform-pillow', ucr_division=None,
include_ucrs=None, exclude_ucrs=None,
num_processes=1, process_num=0, ucr_configs=None, skip_ucr=False,
processor_chunk_size=DEFAULT_PROCESSOR_CHUNK_SIZE,
topics=None, dedicated_migration_process=False, **kwargs):
"""Generic XForm change processor
Processors:
- :py:class:`corehq.apps.userreports.pillow.ConfigurableReportPillowProcessor`
- (disabled when skip_ucr=True)
- :py:class:`pillowtop.processors.elastic.BulkElasticProcessor`
- :py:class:`corehq.pillows.user.UnknownUsersProcessor`
- (disabled when RUN_UNKNOWN_USER_PILLOW=False)
- :py:class:`pillowtop.form.FormSubmissionMetadataTrackerProcessor`
- (disabled when RUN_FORM_META_PILLOW=False)
- :py:class:`corehq.apps.data_interfaces.pillow.CaseDeduplicationPillow``
"""
if topics:
assert set(topics).issubset(FORM_TOPICS), "This is a pillow to process forms only"
topics = topics or FORM_TOPICS
change_feed = KafkaChangeFeed(
topics, client_id=pillow_id, num_processes=num_processes, process_num=process_num,
dedicated_migration_process=dedicated_migration_process
)
ucr_processor = get_ucr_processor(
data_source_providers=[
DynamicDataSourceProvider('XFormInstance'),
StaticDataSourceProvider('XFormInstance')
],
ucr_division=ucr_division,
include_ucrs=include_ucrs,
exclude_ucrs=exclude_ucrs,
run_migrations=(process_num == 0), # only first process runs migrations
ucr_configs=ucr_configs
)
xform_to_es_processor = BulkElasticProcessor(
form_adapter,
doc_filter_fn=xform_pillow_filter,
change_filter_fn=is_couch_change_for_sql_domain
)
unknown_user_form_processor = UnknownUsersProcessor()
form_meta_processor = FormSubmissionMetadataTrackerProcessor()
checkpoint_id = "{}-{}-{}".format(
pillow_id, form_adapter.index_name, user_adapter.index_name)
checkpoint = KafkaPillowCheckpoint(checkpoint_id, topics)
event_handler = KafkaCheckpointEventHandler(
checkpoint=checkpoint, checkpoint_frequency=1000, change_feed=change_feed,
checkpoint_callback=ucr_processor
)
processors = [xform_to_es_processor]
if settings.RUN_UNKNOWN_USER_PILLOW:
processors.append(unknown_user_form_processor)
if settings.RUN_FORM_META_PILLOW:
processors.append(form_meta_processor)
if settings.RUN_DEDUPLICATION_PILLOW:
processors.append(CaseDeduplicationProcessor())
if not skip_ucr:
processors.append(ucr_processor)
return ConstructedPillow(
name=pillow_id,
change_feed=change_feed,
checkpoint=checkpoint,
change_processed_event_handler=event_handler,
processor=processors,
processor_chunk_size=processor_chunk_size,
process_num=process_num,
is_dedicated_migration_process=dedicated_migration_process and (process_num == 0)
)
class SqlFormReindexerFactory(ReindexerFactory):
slug = 'sql-form'
arg_contributors = [
ReindexerFactory.resumable_reindexer_args,
ReindexerFactory.elastic_reindexer_args,
ReindexerFactory.limit_db_args,
ReindexerFactory.domain_arg,
]
def build(self):
limit_to_db = self.options.pop('limit_to_db', None)
domain = self.options.pop('domain', None)
iteration_key = "SqlXFormToElasticsearchPillow_{}_reindexer_{}_{}".format(
form_adapter.index_name, limit_to_db or 'all', domain or 'all'
)
limit_db_aliases = [limit_to_db] if limit_to_db else None
reindex_accessor = FormReindexAccessor(domain=domain, limit_db_aliases=limit_db_aliases)
doc_provider = SqlDocumentProvider(iteration_key, reindex_accessor)
return ResumableBulkElasticPillowReindexer(
doc_provider,
form_adapter,
doc_filter=xform_pillow_filter,
**self.options
)