Source code for corehq.apps.es.aggregations

"""
Aggregate Queries
-----------------
Aggregations are a replacement for Facets

Here is an example used to calculate how many new pregnancy cases each user has
opened in a certain date range.

.. code-block:: python

    res = (CaseES()
           .domain(self.domain)
           .case_type('pregnancy')
           .date_range('opened_on', gte=startdate, lte=enddate))
           .aggregation(TermsAggregation('by_user', 'opened_by')
           .size(0)

    buckets = res.aggregations.by_user.buckets
    buckets.user1.doc_count

There's a bit of magic happening here - you can access the raw json data from
this aggregation via ``res.aggregation('by_user')`` if you'd prefer to skip it.

The ``res`` object has a ``aggregations`` property, which returns a namedtuple
pointing to the wrapped aggregation results.  The name provided at instantiation is
used here (``by_user`` in this example).

The wrapped ``aggregation_result`` object has a ``result`` property containing the
aggregation data, as well as utilties for parsing that data into something more
useful. For example, the ``TermsAggregation`` result also has a ``counts_by_bucket``
method that returns a ``{bucket: count}`` dictionary, which is normally what you
want.

As of this writing, there's not much else developed, but it's pretty easy to
add support for other aggregation types and more results processing
"""
import datetime
import re
from collections import defaultdict, namedtuple
from copy import deepcopy

from corehq.apps.es.const import SIZE_LIMIT

MISSING_KEY = None


class AggregationResult(object):

    def __init__(self, raw, aggregation):
        self.aggregation = aggregation
        self.raw = raw
        self.result = raw.get(self.aggregation.name, {})
        self._aggregations = self.aggregation.aggregations


class Aggregation(object):
    name = None
    type = None
    body = None
    result_class = AggregationResult
    aggregations = None

    def __init__(self):
        raise NotImplementedError()

    def aggregation(self, aggregation):
        if not self.aggregations:
            self.aggregations = []

        self.aggregations.append(aggregation)
        return self

    def assemble(self):
        if self.type == "case_property":
            assembled = self.body
        else:
            assembled = {self.type: self.body}

        if self.aggregations:
            assembled['aggs'] = {}
            for agg in self.aggregations:
                assembled['aggs'][agg.name] = agg.assemble()

        return assembled

    def parse_result(self, result):
        return self.result_class(result, self)


class BucketResult(AggregationResult):

    @property
    def keys(self):
        return [b['key'] for b in self.normalized_buckets]

    @property
    def buckets(self):
        n_buckets = self.normalized_buckets
        buckets = namedtuple('buckets', [b['key'] for b in n_buckets])
        return buckets(**{b['key']: Bucket(b, self._aggregations) for b in n_buckets})

    @property
    def buckets_dict(self):
        return {b['key']: Bucket(b, self._aggregations) for b in self.normalized_buckets}

    @property
    def buckets_list(self):
        return [Bucket(b, self._aggregations) for b in self.normalized_buckets]

    @property
    def raw_buckets(self):
        return self.result['buckets']

    @property
    def normalized_buckets(self):
        return self.raw_buckets

    def counts_by_bucket(self):
        return {b['key']: b['doc_count'] for b in self.normalized_buckets}


class MissingResult(AggregationResult):

    @property
    def bucket(self):
        return Bucket(self.result, self._aggregations)


class TopHitsResult(AggregationResult):

    @property
    def raw_hits(self):
        return self.result['hits']['hits']

    @property
    def doc_ids(self):
        """Return just the docs ids from the response."""
        return [r['_id'] for r in self.raw_hits]

    @property
    def hits(self):
        """Return the docs from the response."""
        return [r['_source'] for r in self.raw_hits]

    @property
    def total(self):
        """Return the total number of docs matching the query."""
        return self.result['hits']['total']


class StatsResult(AggregationResult):

    @property
    def count(self):
        return self.result['count']

    @property
    def max(self):
        return self.result['max']

    @property
    def min(self):
        return self.result['min']

    @property
    def avg(self):
        return self.result['avg']


class ExtendedStatsResult(StatsResult):

    @property
    def std_dev(self):
        return self.result['std_deviation']


class Bucket(object):

    def __init__(self, result, aggregations):
        self.result = result
        self.aggregations = aggregations

    @property
    def key(self):
        return self.result.get('key', MISSING_KEY)

    @property
    def doc_count(self):
        return self.result['doc_count']

    def __getattr__(self, attr):
        sub_aggregation = list(filter(lambda a: a.name == attr, self.aggregations))[0]
        if sub_aggregation:
            return sub_aggregation.parse_result(self.result)

    def __repr__(self):
        return "Bucket(key='{}', doc_count='{}')".format(self.key, self.doc_count)


[docs]class TermsAggregation(Aggregation): """ Bucket aggregation that aggregates by field :param name: aggregation name :param field: name of the field to bucket on :param size: :param missing: define how documents that are missing a value should be treated. By default, they will be ignored. If a value is supplied here it will be used where the value is missing. """ type = "terms" result_class = BucketResult
[docs] def __init__(self, name, field, size=None, missing=None): assert re.match(r'\w+$', name), \ "Names must be valid python variable names, was {}".format(name) assert size is None or size > 0, "Aggregation size must be greater than 0" self.name = name self.body = { "field": field, "size": size if size is not None else SIZE_LIMIT, } if missing: self.body["missing"] = missing
def order(self, field, order="asc", reset=True): query = deepcopy(self) order_field = {field: order} if reset: query.body['order'] = [order_field] else: if not query.body.get('order'): query.body['order'] = [] query.body['order'].append(order_field) return query def size(self, size): assert size is None or size > 0, "Aggregation size must be greater than 0" query = deepcopy(self) query.body['size'] = size return query
class SumResult(AggregationResult): @property def value(self): return self.result['value']
[docs]class SumAggregation(Aggregation): """ Bucket aggregation that sums a field :param name: aggregation name :param field: name of the field to sum """ type = "sum" result_class = SumResult
[docs] def __init__(self, name, field): assert re.match(r'\w+$', name), \ "Names must be valid python variable names, was {}".format(name) self.name = name self.body = { "field": field, }
[docs]class MinAggregation(SumAggregation): """ Bucket aggregation that returns the minumum value of a field :param name: aggregation name :param field: name of the field to min """ type = "min"
[docs]class MaxAggregation(SumAggregation): type = "max"
[docs]class AvgAggregation(SumAggregation): type = "avg"
[docs]class ValueCountAggregation(SumAggregation): type = "value_count"
[docs]class CardinalityAggregation(SumAggregation): type = "cardinality"
[docs]class MissingAggregation(Aggregation): """ A field data based single bucket aggregation, that creates a bucket of all documents in the current document set context that are missing a field value (effectively, missing a field or having the configured NULL value set). :param name: aggregation name :param field: name of the field to bucket on """ type = "missing" result_class = MissingResult
[docs] def __init__(self, name, field): assert re.match(r'\w+$', name), \ "Names must be valid python variable names, was {}".format(name) self.name = name self.body = {"field": field}
[docs]class StatsAggregation(Aggregation): """ Stats aggregation that computes a stats aggregation by field :param name: aggregation name :param field: name of the field to collect stats on :param script: an optional field to allow you to script the computed field """ type = "stats" result_class = StatsResult
[docs] def __init__(self, name, field, script=None): assert re.match(r'\w+$', name), \ "Names must be valid python variable names, was {}".format(name) self.name = name self.body = {"field": field} if script: self.body.update({'script': script})
[docs]class ExtendedStatsAggregation(StatsAggregation): """ Extended stats aggregation that computes an extended stats aggregation by field """ type = "extended_stats" result_class = ExtendedStatsResult
[docs]class TopHitsAggregation(Aggregation): """ A top_hits metric aggregator keeps track of the most relevant document being aggregated This aggregator is intended to be used as a sub aggregator, so that the top matching documents can be aggregated per bucket. :param name: Aggregation name :param field: This is the field to sort the top hits by. If None, defaults to sorting by score. :param is_ascending: Whether to sort the hits in ascending or descending order. :param size: The number of hits to include. Defaults to 1. :param include: An array of fields to include in the hit. Defaults to returning the whole document. """ type = "top_hits" result_class = TopHitsResult
[docs] def __init__(self, name, field=None, is_ascending=True, size=1, include=None): assert re.match(r'\w+$', name), \ "Names must be valid python variable names, was {}".format(name) self.name = name self.body = { 'size': size, } if field: self.body["sort"] = [{ field: { "order": 'asc' if is_ascending else 'desc' }, }] if include: self.body["_source"] = {"include": include}
class FilterResult(AggregationResult): def __getattr__(self, attr): sub_aggregation = list([a for a in self._aggregations if a.name == attr])[0] if sub_aggregation: return sub_aggregation.parse_result(self.result) @property def doc_count(self): return self.result['doc_count']
[docs]class FilterAggregation(Aggregation): """ Bucket aggregation that creates a single bucket for the specified filter :param name: aggregation name :param filter: filter body """ type = "filter" result_class = FilterResult
[docs] def __init__(self, name, filter): self.name = name self.body = filter
[docs]class FiltersAggregation(Aggregation): """ Bucket aggregation that creates a bucket for each filter specified using the filter name. :param name: aggregation name """ type = "filters" result_class = BucketResult
[docs] def __init__(self, name, filters=None): self.name = name self.body = {"filters": (filters or {})}
[docs] def add_filter(self, name, filter): """ :param name: filter name :param filter: filter body """ self.body["filters"][name] = filter return self
[docs]class AggregationRange(namedtuple('AggregationRange', 'start end key')): """ Note that a range includes the "start" value and excludes the "end" value. i.e. start <= X < end :param start: range start :param end: range end :param key: optional key name for the range """ def __new__(cls, start=None, end=None, key=None): assert start or end, "At least one of 'from' or 'to' are required" return super(AggregationRange, cls).__new__(cls, start, end, key) def assemble(self): range_ = {} for key, attr in {'from': 'start', 'to': 'end', 'key': 'key'}.items(): value = getattr(self, attr) if value: if isinstance(value, datetime.date): value = value.isoformat() elif not isinstance(value, str): value = str(value) range_[key] = value return range_
class RangeResult(BucketResult): @property def normalized_buckets(self): buckets = self.raw_buckets if self.aggregation.keyed: def _add_key(key, bucket): bucket['key'] = key return bucket return [_add_key(k, b) for k, b in buckets.items()] else: def _add_key(bucket): key = '{}-{}'.format(bucket.get('from', '*'), bucket.get('to', '*')) bucket['key'] = key return bucket return [_add_key(b) for b in buckets]
[docs]class RangeAggregation(Aggregation): """ Bucket aggregation that creates one bucket for each range :param name: the aggregation name :param field: the field to perform the range aggregations on :param ranges: list of AggregationRange objects :param keyed: set to True to have the results returned by key instead of as a list (see RangeResult.normalized_buckets) """ type = "range" result_class = RangeResult
[docs] def __init__(self, name, field, ranges=None, keyed=True): self.keyed = keyed self.name = name self.body = { 'field': field, 'keyed': keyed, 'ranges': [] } if ranges: for range_ in ranges: self.add_range(range_)
def add_range(self, range_): if isinstance(range_, AggregationRange): range_ = range_.assemble() if range_.get('key'): self.body['keyed'] = True self.body["ranges"].append(range_) return self
class DateHistogramResult(BucketResult): @property def normalized_buckets(self): return [{ 'key': b['key_as_string'], 'doc_count': b['doc_count'], } for b in self.raw_buckets] _Interval = namedtuple('_Interval', 'interval result_format')
[docs]class DateHistogram(Aggregation): """ Aggregate by date range. This can answer questions like "how many forms were created each day?". :param name: what do you want to call this aggregation :param datefield: the document's date field to look at :param interval: the date interval to use - from DateHistogram.Interval :param timezone: do bucketing using this time zone instead of UTC """ type = "date_histogram" result_class = DateHistogramResult class Interval: # Feel free to add more options here # year, quarter, month, week, day, hour, minute, second YEAR = _Interval('year', 'yyyy') MONTH = _Interval('month', 'yyyy-MM') DAY = _Interval('day', 'yyyy-MM-dd')
[docs] def __init__(self, name, datefield, interval, timezone=None): self.name = name self.body = { 'field': datefield, 'interval': interval.interval, 'format': interval.result_format, 'min_doc_count': 1, # Only include buckets with results } if timezone: self.body['time_zone'] = timezone
[docs]class GeohashGridAggregation(Aggregation): """ A multi-bucket aggregation that groups ``geo_point`` and ``geo_shape`` values into buckets that represent a grid. More info: `Geohash grid aggregation <https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-aggregations-bucket-geohashgrid-aggregation.html>`_ """ # noqa: E501 type = 'geohash_grid' result_class = BucketResult
[docs] def __init__(self, name, field, precision): """ Initialize a GeohashGridAggregation :param name: The name of this aggregation :param field: The case property that stores a geopoint :param precision: A value between 1 and 12 High precision geohashes have a long string length and represent cells that cover only a small area (similar to long-format ZIP codes like "02139-4075"). Low precision geohashes have a short string length and represent cells that each cover a large area (similar to short-format ZIP codes like "02139"). """ assert 1 <= precision <= 12 self.name = name self.body = { 'field': field, 'precision': precision, }
[docs]class GeoBoundsAggregation(Aggregation): """ A metric aggregation that computes the bounding box containing all geo_point values for a field. More info: `Geo Bounds Aggregation <https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-aggregations-metrics-geobounds-aggregation.html>`_ """ # noqa: E501 type = 'geo_bounds'
[docs] def __init__(self, name, field): self.name = name self.body = { 'field': field, }
[docs]class NestedAggregation(Aggregation): """ A special single bucket aggregation that enables aggregating nested documents. :param path: Path to nested document """ type = "nested" result_class = FilterResult
[docs] def __init__(self, name, path): self.name = name self.body = { "path": path }
AggregationTerm = namedtuple('AggregationTerm', ['name', 'field'])
[docs]class NestedTermAggregationsHelper(object): """ Helper to run nested term-based queries (equivalent to SQL group-by clauses). This is not at all related to the ES 'nested aggregation'. The final aggregation is a count of documents. Example usage: .. code-block:: python # counting all forms submitted in a domain grouped by app id and user id NestedTermAggregationsHelper( base_query=FormES().domain(domain_name), terms=[ AggregationTerm('app_id', 'app_id'), AggregationTerm('user_id', 'form.meta.userID'), ] ).get_data() This works by bucketing docs first by one terms aggregation, then within that bucket, bucketing further by the next term, and so on. This is then flattened out to appear like a group-by-multiple. """
[docs] def __init__(self, base_query, terms): self.base_query = base_query self.terms = terms
@property def query(self): previous_term = None for name, field in reversed(self.terms): term = TermsAggregation(name, field) if previous_term is not None: term = term.aggregation(previous_term) previous_term = term return self.base_query.aggregation(term) def get_data(self): def _add_terms(aggregation_bucket, term, remaining_terms, current_counts, current_key=None): for bucket in getattr(aggregation_bucket, term.name).buckets_list: key = (bucket.key,) if current_key is None else current_key + (bucket.key,) if remaining_terms: _add_terms(bucket, remaining_terms[0], remaining_terms[1:], current_counts, current_key=key) else: current_counts[key] += bucket.doc_count counts = defaultdict(lambda: 0) _add_terms(self.query.size(0).run().aggregations, self.terms[0], self.terms[1:], current_counts=counts) return self._format_counts(counts) def _format_counts(self, counts): final_aggregation_name = ('doc_count') row_class = namedtuple('NestedQueryRow', [term.name for term in self.terms] + [final_aggregation_name]) for combined_key, count in counts.items(): yield row_class(*(combined_key + (count,)))