from typing import Any, Dict, List, Optional, Tuple, Union
import attr
from jsonobject.containers import JsonDict
from jsonpath_ng.ext.parser import parse as parse_jsonpath
from schema import Optional as SchemaOptional
from schema import And, Or, Schema, SchemaError
from couchforms.const import TAG_FORM, TAG_META
from corehq.apps.locations.models import SQLLocation
from corehq.apps.users.cases import get_owner_id, get_wrapped_owner
from corehq.form_processor.models import CommCareCase
from corehq.motech.const import ( # noqa: F401
COMMCARE_DATA_TYPE_DECIMAL,
COMMCARE_DATA_TYPE_INTEGER,
COMMCARE_DATA_TYPE_TEXT,
COMMCARE_DATA_TYPES_AND_UNKNOWN,
DATA_TYPE_UNKNOWN,
DIRECTION_BOTH,
DIRECTION_EXPORT,
DIRECTION_IMPORT,
DIRECTIONS,
)
from .exceptions import ConfigurationError, JsonpathError
from .serializers import serializers
from .utils import simplify_list
@attr.s
class CaseTriggerInfo:
domain = attr.ib()
case_id = attr.ib()
type = attr.ib(default=None)
name = attr.ib(default=None)
owner_id = attr.ib(default=None)
modified_by = attr.ib(default=None)
updates = attr.ib(factory=dict)
created = attr.ib(default=None)
closed = attr.ib(default=None)
extra_fields = attr.ib(factory=dict)
form_question_values = attr.ib(factory=dict)
def __str__(self):
if self.name:
return f'<CaseTriggerInfo {self.case_id} {self.name!r}>'
return f"<CaseTriggerInfo {self.case_id}>"
def recurse_subclasses(cls):
return (
cls.__subclasses__()
+ [subsub for sub in cls.__subclasses__() for subsub in recurse_subclasses(sub)]
)
[docs]@attr.s(auto_attribs=True, kw_only=True)
class ValueSource:
"""
Subclasses model a reference to a value, like a case property or a
form question.
Use the ``get_value()`` method to fetch the value using the
reference, and serialize it, if necessary, for the external system
that it is being sent to.
"""
external_data_type: Optional[str] = DATA_TYPE_UNKNOWN
commcare_data_type: Optional[str] = DATA_TYPE_UNKNOWN
# Whether the ValueSource is import-only ("in"), export-only ("out"), or
# for both import and export (the default, None)
direction: Optional[str] = DIRECTION_BOTH
# Map CommCare values to remote system values or IDs. e.g.::
#
# {
# "case_property": "pill"
# "value_map": {
# "red": "00ff0000",
# "blue": "000000ff",
# }
# }
value_map: Optional[dict] = None
# Used for importing a value from a JSON document.
jsonpath: Optional[str] = None
[docs] @classmethod
def wrap(cls, data: dict):
"""
Allows us to duck-type JsonObject, and useful for doing
pre-instantiation transforms / dropping unwanted attributes.
"""
data.pop("doc_type", None)
return cls(**data)
@classmethod
def get_schema_params(cls) -> Tuple[Tuple, Dict]:
args = ({
SchemaOptional("doc_type"): str,
SchemaOptional("external_data_type"): str,
SchemaOptional("commcare_data_type"): Or(*COMMCARE_DATA_TYPES_AND_UNKNOWN),
SchemaOptional("direction"): Or(*DIRECTIONS),
SchemaOptional("value_map"): dict,
SchemaOptional("jsonpath"): str,
},)
return args, {}
@property
def can_import(self):
return not self.direction or self.direction == DIRECTION_IMPORT
@property
def can_export(self):
return not self.direction or self.direction == DIRECTION_EXPORT
[docs] def get_value(self, case_trigger_info: CaseTriggerInfo) -> Any:
"""
Returns the value referred to by the ValueSource, serialized for
the external system.
"""
value = self.get_commcare_value(case_trigger_info)
return self.serialize(value)
def get_import_value(self, external_data):
external_value = self.get_external_value(external_data)
return self.deserialize(external_value)
def get_commcare_value(self, case_trigger_info: CaseTriggerInfo) -> Any:
raise NotImplementedError
def get_external_value(self, external_data):
if self.jsonpath is None:
raise ConfigurationError(f"{self} is not configured to navigate "
"external data")
try:
jsonpath = parse_jsonpath(self.jsonpath)
except Exception as err:
raise JsonpathError from err
matches = jsonpath.find(external_data)
values = [m.value for m in matches]
return simplify_list(values)
[docs] def set_external_value(self, external_data: dict, info: CaseTriggerInfo):
"""
Builds ``external_data`` by reference.
Currently implemented for dicts using JSONPath but could be
implemented for other types as long as they are mutable.
"""
if self.jsonpath is None:
raise ConfigurationError(f"{self} is not configured to navigate "
"external data")
value = self.get_value(info)
if value is None:
# Don't set external value if CommCare has no value
return
try:
jsonpath = parse_jsonpath(self.jsonpath)
except Exception as err:
raise JsonpathError from err
jsonpath.update_or_create(external_data, value)
[docs] def serialize(self, value: Any) -> Any:
"""
Converts the value's CommCare data type or format to its data
type or format for the external system, if necessary, otherwise
returns the value unchanged.
"""
if self.value_map:
return self.value_map.get(value)
serializer = (
serializers.get((self.commcare_data_type, self.external_data_type))
or serializers.get((None, self.external_data_type))
)
return serializer(value) if serializer else value
[docs] def deserialize(self, external_value: Any) -> Any:
"""
Converts the value's external data type or format to its data
type or format for CommCare, if necessary, otherwise returns the
value unchanged.
"""
if self.value_map:
reverse_map = {v: k for k, v in self.value_map.items()}
return reverse_map.get(external_value)
serializer = (
serializers.get((self.external_data_type, self.commcare_data_type))
or serializers.get((None, self.commcare_data_type))
)
return serializer(external_value) if serializer else external_value
[docs]@attr.s(auto_attribs=True, kw_only=True)
class CaseProperty(ValueSource):
"""
A reference to a case property value.
e.g. Get the value of a case property named "dob"::
{
"case_property": "dob"
}
"""
case_property: str
@classmethod
def get_schema_params(cls) -> Tuple[Tuple, Dict]:
(schema, *other_args), kwargs = super().get_schema_params()
schema.update({"case_property": And(str, len)})
return (schema, *other_args), kwargs
def get_commcare_value(self, case_trigger_info: CaseTriggerInfo) -> Any:
if self.case_property in case_trigger_info.updates:
return case_trigger_info.updates[self.case_property]
return case_trigger_info.extra_fields.get(self.case_property)
[docs]@attr.s(auto_attribs=True, kw_only=True)
class ConstantValue(ValueSource):
"""
``ConstantValue`` provides a ``ValueSource`` for constant values.
``value`` must be cast as ``value_data_type``.
``get_value()`` returns the value for export. Use
``external_data_type`` to cast the export value.
``get_import_value()`` and ``deserialize()`` return the value for
import. Use ``commcare_data_type`` to cast the import value.
>>> one = ConstantValue.wrap({
... "value": 1,
... "value_data_type": COMMCARE_DATA_TYPE_INTEGER,
... "commcare_data_type": COMMCARE_DATA_TYPE_DECIMAL,
... "external_data_type": COMMCARE_DATA_TYPE_TEXT,
... })
>>> info = CaseTriggerInfo("test-domain", None)
>>> one.deserialize("foo")
1.0
>>> one.get_value(info) # Returns '1.0', not '1'. See note below.
'1.0'
.. NOTE::
``one.get_value(info)`` returns ``'1.0'``, not ``'1'``, because
``get_commcare_value()`` casts ``value`` as
``commcare_data_type`` first. ``serialize()`` casts it from
``commcare_data_type`` to ``external_data_type``.
This may seem counter-intuitive, but we do it to preserve the
behaviour of ``ValueSource.serialize()``.
"""
value: str
value_data_type: str = COMMCARE_DATA_TYPE_TEXT
def __eq__(self, other):
return (
super().__eq__(other)
and self.value == other.value
and self.value_data_type == other.value_data_type
)
@classmethod
def get_schema_params(cls) -> Tuple[Tuple, Dict]:
(schema, *other_args), kwargs = super().get_schema_params()
schema.update({
"value": object,
SchemaOptional("value_data_type"): str,
})
return (schema, *other_args), kwargs
def get_commcare_value(self, case_trigger_info: CaseTriggerInfo) -> Any:
serializer = (
serializers.get((self.value_data_type, self.commcare_data_type))
or serializers.get((None, self.commcare_data_type))
)
return serializer(self.value) if serializer else self.value
def get_external_value(self, external_data):
serializer = (
serializers.get((self.value_data_type, self.external_data_type))
or serializers.get((None, self.external_data_type))
)
return serializer(self.value) if serializer else self.value
[docs] def deserialize(self, external_value: Any) -> Any:
"""
Converts the value's external data type or format to its data
type or format for CommCare, if necessary, otherwise returns the
value unchanged.
"""
serializer = (
serializers.get((self.value_data_type, self.external_data_type))
or serializers.get((None, self.external_data_type))
)
external_value = serializer(self.value) if serializer else self.value
return super().deserialize(external_value)
[docs]@attr.s(auto_attribs=True, kw_only=True)
class CaseOwnerAncestorLocationField(ValueSource):
"""
A reference to a location metadata value. The location may be the
case owner, the case owner's location, or the first ancestor
location of the case owner where the metadata value is set.
e.g. ::
{
"doc_type": "CaseOwnerAncestorLocationField",
"location_field": "openmrs_uuid"
}
"""
case_owner_ancestor_location_field: str
[docs] @classmethod
def wrap(cls, data):
if "location_field" in data:
data["case_owner_ancestor_location_field"] = data.pop("location_field")
return super().wrap(data)
@classmethod
def get_schema_params(cls) -> Tuple[Tuple, Dict]:
(schema, *other_args), kwargs = super().get_schema_params()
schema.pop(SchemaOptional("doc_type"))
old_style = schema.copy()
old_style.update({
"doc_type": "CaseOwnerAncestorLocationField",
"location_field": str,
})
new_style = schema.copy()
new_style.update({
SchemaOptional("doc_type"): "CaseOwnerAncestorLocationField",
"case_owner_ancestor_location_field": str,
})
schema = Or(old_style, new_style)
return (schema, *other_args), kwargs
def get_commcare_value(self, case_trigger_info: CaseTriggerInfo) -> Any:
location = get_case_location(case_trigger_info)
if location:
return get_ancestor_location_metadata_value(
location, self.case_owner_ancestor_location_field
)
[docs]@attr.s
class CasePropertyConstantValue(ConstantValue, CaseProperty):
pass
[docs]@attr.s(auto_attribs=True, kw_only=True)
class SupercaseValueSource(ValueSource):
"""
A reference to a list of parent/host cases.
Evaluates nested ValueSource config, allowing for recursion.
"""
supercase_value_source: dict
# Optional filters for indices
identifier: Optional[str] = None
referenced_type: Optional[str] = None
# relationship: Optional[Literal['child', 'extension']] = None # Py3.8+
relationship: Optional[str] = None
@classmethod
def get_schema_params(cls) -> Tuple[Tuple, Dict]:
(schema, *other_args), kwargs = super().get_schema_params()
schema.update({
'supercase_value_source': And(dict, len),
SchemaOptional('identifier'): str,
SchemaOptional('referenced_type'): str,
SchemaOptional('relationship'): lambda r: r in ('child', 'extension'),
})
return (schema, *other_args), kwargs
def get_commcare_value(self, info):
values = []
for supercase_info in self._iter_supercase_info(info):
value_source = as_value_source(self.supercase_value_source)
value = value_source.get_commcare_value(supercase_info)
values.append(value)
return values
def get_import_value(self, external_data):
# OpenMRS Atom feed and FHIR API must build case blocks for
# related cases for this to be implemented
raise NotImplementedError
[docs] def set_external_value(self, external_data, info):
for i, supercase_info in enumerate(self._iter_supercase_info(info)):
value_source = as_value_source(self.supercase_value_source)
_subs_counters(value_source, i)
value_source.set_external_value(external_data, supercase_info)
def _iter_supercase_info(self, info: CaseTriggerInfo):
def filter_index(idx):
return (
(not self.identifier or idx.identifier == self.identifier)
and (not self.referenced_type or idx.referenced_type == self.referenced_type)
and (not self.relationship or idx.relationship == self.relationship)
)
case = CommCareCase.objects.get_case(info.case_id, info.domain)
for index in case.live_indices:
if filter_index(index):
supercase = CommCareCase.objects.get_case(index.referenced_id, info.domain)
yield get_case_trigger_info_for_case(
supercase,
[self.supercase_value_source],
)
[docs]@attr.s(auto_attribs=True, kw_only=True)
class SubcaseValueSource(ValueSource):
"""
A reference to a list of child/extension cases.
Evaluates nested ValueSource config, allowing for recursion.
"""
subcase_value_source: dict
case_types: Optional[List[str]] = None
is_closed: Optional[bool] = None
@classmethod
def get_schema_params(cls) -> Tuple[Tuple, Dict]:
(schema, *other_args), kwargs = super().get_schema_params()
schema.update({
'subcase_value_source': And(dict, len),
SchemaOptional('case_types'): list,
SchemaOptional('is_closed'): bool,
})
return (schema, *other_args), kwargs
def get_commcare_value(self, info: CaseTriggerInfo) -> Any:
values = []
for subcase_info in self._iter_subcase_info(info):
value_source = as_value_source(self.subcase_value_source)
value = value_source.get_commcare_value(subcase_info)
values.append(value)
return values
def get_import_value(self, external_data):
# OpenMRS Atom feed and FHIR API must build case blocks for
# related cases for this to be implemented
raise NotImplementedError
[docs] def set_external_value(self, external_data, info):
for i, subcase_info in enumerate(self._iter_subcase_info(info)):
value_source = as_value_source(self.subcase_value_source)
_subs_counters(value_source, i)
value_source.set_external_value(external_data, subcase_info)
def _iter_subcase_info(self, info: CaseTriggerInfo):
subcases = CommCareCase.objects.get_reverse_indexed_cases(
info.domain,
[info.case_id],
self.case_types,
self.is_closed,
)
for subcase in subcases:
yield get_case_trigger_info_for_case(
subcase,
[self.subcase_value_source],
)
def as_value_source(
value_source_config: Union[dict, JsonDict],
) -> ValueSource:
if isinstance(value_source_config, JsonDict):
value_source_config = dict(value_source_config) # JsonDict fails assertion in Schema.validate()
for subclass in recurse_subclasses(ValueSource):
try:
args, kwargs = subclass.get_schema_params()
validated_config = Schema(*args, **kwargs).validate(value_source_config)
except SchemaError:
pass
else:
return subclass.wrap(validated_config)
else:
raise TypeError(f"Unable to determine class for {value_source_config!r}")
[docs]def get_value(
value_source_config: JsonDict,
case_trigger_info: CaseTriggerInfo
) -> Any:
"""
Returns the value referred to by the value source definition,
serialized for the external system.
"""
value_source = as_value_source(value_source_config)
return value_source.get_value(case_trigger_info)
[docs]def get_import_value(
value_source_config: JsonDict,
external_data: dict, # This may change if/when we support non-JSON APIs
) -> Any:
"""
Returns the external value referred to by the value source
definition, deserialized for CommCare.
"""
value_source = as_value_source(value_source_config)
return value_source.get_import_value(external_data)
[docs]def deserialize(value_source_config: JsonDict, external_value: Any) -> Any:
"""
Converts the value's external data type or format to its data
type or format for CommCare, if necessary, otherwise returns the
value unchanged.
"""
value_source = as_value_source(value_source_config)
return value_source.deserialize(external_value)
def get_ancestor_location_metadata_value(location, metadata_key):
assert isinstance(location, SQLLocation), type(location)
for location in reversed(location.get_ancestors(include_self=True)):
if location.metadata.get(metadata_key):
return location.metadata[metadata_key]
return None
[docs]def get_case_location(case):
"""
If the owner of the case is a location, return it. Otherwise return
the owner's primary location. If the case owner does not have a
primary location, return None.
"""
return get_owner_location(case.domain, get_owner_id(case))
def get_owner_location(domain, owner_id):
owner = get_wrapped_owner(owner_id)
if not owner:
return None
if isinstance(owner, SQLLocation):
return owner
location_id = owner.get_location_id(domain)
return SQLLocation.by_location_id(location_id) if location_id else None
def get_case_trigger_info_for_case(case, value_source_configs):
case_properties = [c['case_property'] for c in value_source_configs
if 'case_property' in c]
extra_fields = {p: case.get_case_property(p) for p in case_properties}
return CaseTriggerInfo(
domain=case.domain,
case_id=case.case_id,
type=case.type,
name=case.name,
owner_id=case.owner_id,
modified_by=case.modified_by,
extra_fields=extra_fields,
)
def _subs_counters(value_source, counter0):
"""
Substitutes "{counter0}" and "{counter1}" in value_source.jsonpath.
counter0 is a 0-indexed counter and counter1 is a 1-indexed counter.
They are used for incrementing indices.
>>> vs = as_value_source({
... 'case_property': 'name',
... 'jsonpath': '$.name[{counter0}].text',
... })
>>> _subs_counters(vs, 3)
>>> vs.jsonpath
'$.name[3].text'
"""
if counter0 is None:
return
if value_source.jsonpath and (
'{counter0}' in value_source.jsonpath
or '{counter1}' in value_source.jsonpath
):
value_source.jsonpath = value_source.jsonpath.format(
counter0=counter0,
counter1=counter0 + 1,
)