Source code for corehq.motech.openmrs.repeaters

import json
from itertools import chain
from typing import Iterable

from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _

from jsonobject.containers import JsonDict
from memoized import memoized
from requests import RequestException
from urllib3.exceptions import HTTPError

from import extract_case_blocks
from couchforms.signals import successful_form_received
from dimagi.ext.couchdbkit import (

from corehq.apps.locations.dbaccessors import get_one_commcare_user_at_location
from corehq.form_processor.models import CommCareCase, XFormInstance
from corehq.motech.openmrs.const import XMLNS_OPENMRS
from corehq.motech.openmrs.openmrs_config import OpenmrsConfig
from corehq.motech.openmrs.repeater_helpers import (
from corehq.motech.openmrs.workflow import execute_workflow
from corehq.motech.openmrs.workflow_tasks import (
from corehq.motech.repeater_helpers import (
from corehq.motech.repeaters.models import OptionValue, CaseRepeater
from corehq.motech.repeaters.repeater_generators import (
from corehq.motech.repeaters.signals import create_repeat_records
from corehq.motech.utils import pformat_json
from corehq.motech.value_source import (
from corehq.toggles import OPENMRS_INTEGRATION

class AtomFeedStatus(DocumentSchema):
    last_polled_at = DateTimeProperty(default=None)

    # The first time the feed is polled, don't replay all the changes
    # since OpenMRS was installed. Start from the most recent changes.
    last_page = StringProperty(default='recent')

[docs]class OpenmrsRepeater(CaseRepeater): """ ``OpenmrsRepeater`` is responsible for updating OpenMRS patients with changes made to cases in CommCare. It is also responsible for creating OpenMRS "visits", "encounters" and "observations" when a corresponding visit form is submitted in CommCare. The ``OpenmrsRepeater`` class is different from most repeater classes in three details: 1. It has a case type and it updates the OpenMRS equivalent of cases like the ``CaseRepeater`` class, but it reads forms like the ``FormRepeater`` class. So it subclasses ``CaseRepeater`` but its payload format is ``form_json``. 2. It makes many API calls for each payload. 3. It can have a location. """ class Meta(object): proxy = True app_label = 'repeaters' friendly_name = _("Forward to OpenMRS") payload_generator_classes = (FormRepeaterJsonPayloadGenerator,) include_app_id_param = OptionValue(default=False) location_id = OptionValue(default='') _has_config = True # self.white_listed_case_types must have exactly one case type set # for Atom feed integration to add cases for OpenMRS patients. # self.location_id must be set to determine their case owner. The # owner is set to the first CommCareUser instance found at that # location. atom_feed_enabled = OptionValue(default=False) atom_feed_status = OptionValue(default=dict) openmrs_config = OptionValue(default=dict) @cached_property def requests(self): # Used by atom_feed module and views that don't have a payload # associated with the request # TODO: Drop this. Use repeater.connection_settings.get_requests() instead return self.connection_settings.get_requests() @cached_property def first_user(self): return get_one_commcare_user_at_location(self.domain, self.location_id) @memoized def payload_doc(self, repeat_record): return XFormInstance.objects.get_form(repeat_record.payload_id, repeat_record.domain) @property def form_class_name(self): """ The class name used to determine which edit form to use """ return "OpenmrsRepeater" @classmethod def available_for_domain(cls, domain): return OPENMRS_INTEGRATION.enabled(domain) def allowed_to_forward(self, payload): """ Forward the payload if ... * it did not come from OpenMRS, and * CaseRepeater says it's OK for the case types and users of any of the payload's cases, and * this repeater forwards to the right OpenMRS server for any of the payload's cases. :param payload: An XFormInstance (not a case) """ if payload.xmlns == XMLNS_OPENMRS: # payload came from OpenMRS. Don't send it back. return False case_blocks = extract_case_blocks(payload) case_ids = [case_block['@case_id'] for case_block in case_blocks] cases = CommCareCase.objects.get_cases(case_ids, payload.domain, ordered=True) if not any(CaseRepeater.allowed_to_forward(self, case) for case in cases): # If none of the case updates in the payload are allowed to # be forwarded, drop it. return False if not self.location_id: # If this repeater does not have a location, all payloads # should go to it. return True repeaters = [repeater for case in cases for repeater in get_case_location_ancestor_repeaters(case)] # If this repeater points to the wrong OpenMRS server for this # payload then let the right repeater handle it. return self in repeaters def get_payload(self, repeat_record): payload = super().get_payload(repeat_record) return json.loads(payload) def send_request(self, repeat_record, payload): value_source_configs: Iterable[JsonDict] = chain( self.openmrs_config['case_config']['patient_identifiers'].values(), self.openmrs_config['case_config']['person_properties'].values(), self.openmrs_config['case_config']['person_preferred_name'].values(), self.openmrs_config['case_config']['person_preferred_address'].values(), self.openmrs_config['case_config']['person_attributes'].values(), ) case_trigger_infos = get_relevant_case_updates_from_form_json( self.domain, payload, case_types=self.white_listed_case_types, extra_fields=[conf["case_property"] for conf in value_source_configs if "case_property" in conf], form_question_values=get_form_question_values(payload), ) requests = self.connection_settings.get_requests(repeat_record.payload_id) try: response = send_openmrs_data( requests, self.domain, payload, self.openmrs_config, case_trigger_infos, ) except Exception as err: requests.notify_exception(str(err)) return RepeaterResponse(400, 'Bad Request', pformat_json(str(err))) return response def _validate_openmrs_config(self): OpenmrsConfig.wrap(self.openmrs_config).validate() for feed in self.atom_feed_status.keys(): AtomFeedStatus.wrap(self.atom_feed_status[feed]).validate() def save(self, *args, **kwargs): self._validate_openmrs_config() super().save(*args, **kwargs)
def send_openmrs_data(requests, domain, form_json, openmrs_config, case_trigger_infos): """ Updates an OpenMRS patient and (optionally) creates visits. This involves several requests to the `OpenMRS REST Web Services`_. If any of those requests fail, we want to roll back previous changes to avoid inconsistencies in OpenMRS. To do this we define a workflow of tasks we want to do. Each workflow task has a rollback task. If a task fails, all previous tasks are rolled back in reverse order. :return: A response-like object that can be used by Repeater.handle_response(), RepeatRecord.handle_success() and RepeatRecord.handle_failure() .. _OpenMRS REST Web Services: """ warnings = [] errors = [] for info in case_trigger_infos: assert isinstance(info, CaseTriggerInfo) try: patient = get_patient(requests, domain, info, openmrs_config) except (RequestException, HTTPError) as err: errors.append(_( "Unable to create an OpenMRS patient for case " f"{info.case_id!r}: {err}" )) continue if patient is None: warnings.append( f"CommCare case '{info.case_id}' was not matched to a " f"patient in OpenMRS instance '{requests.base_url}'." ) continue # case_trigger_infos are info about all of the cases # created/updated by the form. Execute a separate workflow to # update each patient. workflow = [ # Update name first. If the current name in OpenMRS fails # validation, other API requests will be rejected. UpdatePersonNameTask(requests, info, openmrs_config, patient['person']), # Update identifiers second. If a current identifier fails # validation, other API requests will be rejected. SyncPatientIdentifiersTask(requests, info, openmrs_config, patient), # Now we should be able to update the rest. UpdatePersonPropertiesTask(requests, info, openmrs_config, patient['person']), SyncPersonAttributesTask( requests, info, openmrs_config, patient['person']['uuid'], patient['person']['attributes'] ), ] if patient['person']['preferredAddress']: workflow.append( UpdatePersonAddressTask(requests, info, openmrs_config, patient['person']) ) else: workflow.append( CreatePersonAddressTask(requests, info, openmrs_config, patient['person']) ) workflow.append( CreateVisitsEncountersObsTask( requests, domain, info, form_json, openmrs_config, patient['person']['uuid'] ), ) errors.extend( execute_workflow(workflow) ) if errors: requests.notify_error(f'Errors encountered sending OpenMRS data: {errors}') # If the form included multiple patients, some workflows may # have succeeded, but don't say everything was OK if any # workflows failed. (Of course most forms will only involve one # case, so one workflow.) return RepeaterResponse(400, 'Bad Request', "Errors: " + pformat_json([str(e) for e in errors])) if warnings: return RepeaterResponse(201, "Accepted", "Warnings: " + pformat_json([str(e) for e in warnings])) return RepeaterResponse(200, "OK") def create_openmrs_repeat_records(sender, xform, **kwargs): create_repeat_records(OpenmrsRepeater, xform) successful_form_received.connect(create_openmrs_repeat_records)