from corehq.apps.change_feed import topics
from corehq.apps.change_feed.consumer.feed import KafkaChangeFeed, KafkaCheckpointEventHandler
from corehq.apps.change_feed.document_types import get_doc_meta_object_from_document, \
change_meta_from_doc_meta_and_document
from corehq.apps.change_feed.data_sources import FORM_SQL
from corehq.apps.users.models import CommCareUser, WebUser
from corehq.apps.reports.analytics.esaccessors import get_last_forms_by_app
from corehq.form_processor.backends.sql.dbaccessors import FormReindexAccessor
from corehq.util.doc_processor.couch import CouchDocumentProvider
from corehq.util.doc_processor.interface import BaseDocProcessor, DocumentProcessorController
from corehq.util.doc_processor.sql import SqlDocumentProvider
from pillowtop.checkpoints.manager import KafkaPillowCheckpoint
from pillowtop.feed.interface import Change
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors.form import FormSubmissionMetadataTrackerProcessor
from pillowtop.reindexer.reindexer import Reindexer, ReindexerFactory
class AppFormSubmissionReindexDocProcessor(BaseDocProcessor):
def __init__(self, pillow_processor, data_source_type, data_source_name):
self.pillow_processor = pillow_processor
self.data_source_type = data_source_type
self.data_source_name = data_source_name
def process_doc(self, doc):
change = self._doc_to_change(doc, self.data_source_type, self.data_source_name)
try:
self.pillow_processor.process_change(change)
except Exception:
return False
else:
return True
def handle_skip(self, doc):
print('Unable to process form {} with build {}'.format(
doc['_id'],
doc.get('build_id')
))
return True
@staticmethod
def _doc_to_change(doc, data_source_type, data_source_name):
doc_meta = get_doc_meta_object_from_document(doc)
change_meta = change_meta_from_doc_meta_and_document(
doc_meta=doc_meta,
document=doc,
data_source_type=data_source_type,
data_source_name=data_source_name,
)
return Change(
id=change_meta.document_id,
sequence_id=None,
document=doc,
deleted=change_meta.is_deletion,
metadata=change_meta,
document_store=None,
)
class AppFormSubmissionReindexer(Reindexer):
reset = False
def __init__(self, doc_provider, data_source_type, data_source_name, chunk_size=1000, reset=False):
self.reset = reset
self.doc_provider = doc_provider
self.chunk_size = chunk_size
self.doc_processor = AppFormSubmissionReindexDocProcessor(
FormSubmissionMetadataTrackerProcessor(),
data_source_type,
data_source_name,
)
def reindex(self):
processor = DocumentProcessorController(
self.doc_provider,
self.doc_processor,
reset=self.reset,
chunk_size=self.chunk_size,
)
processor.run()
class SqlAppFormSubmissionTrackerReindexerFactory(ReindexerFactory):
slug = 'sql-app-form-submission'
arg_contributors = [
ReindexerFactory.resumable_reindexer_args,
]
def build(self):
iteration_key = "SqlAppFormSubmissionTrackerPillow_reindexer"
doc_provider = SqlDocumentProvider(
iteration_key,
FormReindexAccessor(include_attachments=False)
)
return AppFormSubmissionReindexer(
doc_provider, FORM_SQL, 'form_processor_xforminstancesql', **self.options
)
class UserAppFormSubmissionDocProcessor(BaseDocProcessor):
def __init__(self, pillow_processor):
self.pillow_processor = pillow_processor
def process_doc(self, doc):
form_submission_changes = self._doc_to_changes(doc)
for change in form_submission_changes:
try:
self.pillow_processor.process_change(change)
except Exception:
return False
return True
def handle_skip(self, doc):
print('Unable to process user {}'.format(
doc['_id'],
))
return True
def _doc_to_changes(self, doc):
# creates a change object for the last form submission
# for the user to each of their apps.
# this allows us to reindex for the app status report
# without reindexing all forms.
changes = []
forms = get_last_forms_by_app(doc['_id'])
for form in forms:
doc_meta = get_doc_meta_object_from_document(form)
change_meta = change_meta_from_doc_meta_and_document(
doc_meta=doc_meta,
document=form,
data_source_type='elasticsearch',
data_source_name='hqforms',
)
changes.append(Change(
id=change_meta.document_id,
sequence_id=None,
document=form,
deleted=change_meta.is_deletion,
metadata=change_meta,
document_store=None,
))
return changes
class UserAppFormSubmissionReindexer(Reindexer):
def __init__(self, doc_provider, chunk_size=1000, reset=False):
self.reset = reset
self.doc_provider = doc_provider
self.chunk_size = chunk_size
self.doc_processor = UserAppFormSubmissionDocProcessor(FormSubmissionMetadataTrackerProcessor())
def reindex(self):
processor = DocumentProcessorController(
self.doc_provider,
self.doc_processor,
reset=self.reset,
chunk_size=self.chunk_size,
)
processor.run()
class UserAppFormSubmissionReindexerFactory(ReindexerFactory):
slug = 'user-app-form-submission'
arg_contributors = [
ReindexerFactory.resumable_reindexer_args,
]
def build(self):
iteration_key = "UserAppFormSubmissionTrackerPillow_reindexer"
doc_provider = CouchDocumentProvider(iteration_key, doc_type_tuples=[
CommCareUser,
WebUser
])
return UserAppFormSubmissionReindexer(doc_provider, **self.options)