Querying Elasticsearch


ESQuery is a library for building elasticsearch queries in a friendly, more readable manner.

Basic usage

There should be a file and subclass of ESQuery for each index we have.

Each method returns a new object, so you can chain calls together like SQLAlchemy. Here’s an example usage:

q = (FormsES()
     .source(['xmlns', 'domain', 'app_id'])
     .sort('received_on', desc=False)
     .terms_aggregation('babies.count', 'babies_saved'))
result = q.run()
total_docs = result.total
hits = result.hits

Generally useful filters and queries should be abstracted away for re-use, but you can always add your own like so:

q.filter({"some_arbitrary_filter": {...}})
q.set_query({"fancy_query": {...}})

For debugging or more helpful error messages, you can use query.dumps() and query.pprint(), both of which use json.dumps() and are suitable for pasting in to ES Head or Marvel or whatever


Filters are implemented as standalone functions, so they can be composed and nested q.OR(web_users(), mobile_users()). Filters can be passed to the query.filter method: q.filter(web_users())

There is some syntactic sugar that lets you skip this boilerplate and just call the filter as if it were a method on the query class: q.web_users() In order to be available for this shorthand, filters are added to the builtin_filters property of the main query class. I know that’s a bit confusing, but it seemed like the best way to make filters available in both contexts.

Generic filters applicable to all indices are available in corehq.apps.es.filters. (But most/all can also be accessed as a query method, if appropriate)

Filtering Specific Indices

There is a file for each elasticsearch index (if not, feel free to add one). This file provides filters specific to that index, as well as an appropriately-directed ESQuery subclass with references to these filters.

These index-specific query classes also have default filters to exclude things like inactive users or deleted docs. These things should nearly always be excluded, but if necessary, you can remove these with remove_default_filters.


  • es_query - the entire query, filters, query, pagination

  • filters - a list of the individual filters

  • query - the query, used for searching, not filtering

  • field - a field on the document. User docs have a ‘domain’ field.

  • lt/gt - less/greater than

  • lte/gte - less/greater than or equal to

class corehq.apps.es.es_query.ESQuery(index=None, for_export=False)[source]

This query builder only outputs the following query structure:

  "query": {
    "bool": {
      "filter": {
        "and": [
      "query": <query>
  <size, sort, other params>
__init__(index=None, for_export=False)[source]
add_query(new_query, clause)[source]

Add a query to the current list of queries


Add the passed-in aggregation to the query

property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))


Returns the JSON query that will be sent to elasticsearch.


Turn off _source retrieval. Mostly useful if you just want the doc_ids


Restrict the fields returned from elasticsearch

Deprecated. Use source instead.


Add the passed-in filter to the query. All filtering goes through this class.

property filters

Return a list of the filters used in this query, suitable if you want to reproduce a query with additional filtering.


Performs a minimal query to get the ids of the matching documents

For very large sets of IDs, use scroll_ids instead

nested_sort(path, field_name, nested_filter, desc=False, reset_sort=True, sort_missing=None)[source]

Order results by the value of a nested field


pretty prints the JSON query that will be sent to elasticsearch.


Remove a specific default filter by passing in its name.


Sensible defaults are provided. Use this if you don’t want ‘em


Actually run the query. Returns an ESQuerySet object.


Run the query against the scroll api. Returns an iterator yielding each document that matches the query.


Returns a generator of all matching ids


Returns a ScanResult for all matched documents.

Used for iterating docs for a very large query where consuming the docs via self.scroll() may exceed the amount of time that the scroll context can remain open. This is achieved by:

  1. Fetching the IDs for all matched documents (via scroll_ids()) and caching them in a temporary file on disk, then

  2. fetching the documents by (chunked blocks of) IDs streamed from the temporary file.

Original design PR: https://github.com/dimagi/commcare-hq/pull/20282

Caveats: - There is no guarantee that the returned ScanResult’s count property will match the number of yielded docs. - Documents that are present when scroll_ids() is called, but are deleted prior to being fetched in full will be missing from the results, and this scenario will not raise an exception. - If Elastic document ID values are ever reused (i.e. new documents are created with the same ID of a previously-deleted document) then this method would become unsafe because it could yield documents that were not matched by the query.

search_string_query(search_string, default_fields)[source]

Accepts a user-defined search string


Set the query. Most stuff we want is better done with filters, but if you actually want Levenshtein distance or prefix querying…


To be used with get_sorting_block, which interprets datatables sorting


Restrict number of results returned. Analagous to SQL limit, except when performing a scroll, in which case this value becomes the number of results to fetch per scroll request.

sort(field, desc=False, reset_sort=True)[source]

Order the results by field.

source(include, exclude=None)[source]

Restrict the output of _source in the queryset. This can be used to return an object in a queryset


Pagination. Analagous to SQL offset.


modeled after django’s QuerySet.values

class corehq.apps.es.es_query.ESQuerySet(raw, query)[source]
The object returned from ESQuery.run
  • ESQuerySet.raw is the raw response from elasticsearch

  • ESQuerySet.query is the ESQuery object

__init__(raw, query)[source]
property doc_ids

Return just the docs ids from the response.

property hits

Return the docs from the response.

static normalize_result(query, result)[source]

Return the doc from an item in the query response.

property total

Return the total number of docs matching the query.

class corehq.apps.es.es_query.HQESQuery(index=None, for_export=False)[source]

Query logic specific to CommCareHQ

property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

exception corehq.apps.es.es_query.InvalidQueryError[source]

Query parameters cannot be assembled into a valid search.

Available Filters

The following filters are available on any ESQuery instance - you can chain any of these on your query.

Note also that the term filter accepts either a list or a single element. Simple filters which match against a field are based on this filter, so those will also accept lists. That means you can do form_query.xmlns(XMLNS1) or form_query.xmlns([XMLNS1, XMLNS2, ...]).

Contributing: Additions to this file should be added to the builtin_filters method on either ESQuery or HQESQuery, as appropriate (is it an HQ thing?).


Filter docs to match all of the filters passed in


Exclude docs matching the filter passed in


Filter docs to match any of the filters passed in

corehq.apps.es.filters.date_range(field, gt=None, gte=None, lt=None, lte=None)[source]

Range filter that accepts date and datetime objects as arguments


Filter by doc_id. Also accepts a list of doc ids


Filter by doc_type. Also accepts a list


Filter by domain.


Only return docs with a missing or null value for field


Only return docs which have a value for field

corehq.apps.es.filters.geo_bounding_box(field, top_left, bottom_right)[source]

Only return geopoints stored in field that are located within the bounding box defined by top_left and bottom_right.

top_left and bottom_right accept a range of data types and formats.

More info: Geo Bounding Box Query

corehq.apps.es.filters.geo_grid(field, geohash)[source]

Filters cases by the geohash grid cell in which they are located.

corehq.apps.es.filters.geo_polygon(field, points)[source]

Filters geo_point values in field that fall within the polygon described by the list of points.

More info: Geo Polygon Query

  • field – A field with Elasticsearch data type geo_point.

  • points – A list of points that describe a polygon. Elasticsearch supports a range of formats for list items.


A filter dict.

corehq.apps.es.filters.geo_shape(field, shape, relation='intersects')[source]

Filters cases by case properties indexed using the geo_point type.

More info: The Geoshape query reference

  • field – The field where geopoints are stored

  • shape – A shape definition given in GeoJSON geometry format. More info: The GeoJSON specification (RFC 7946)

  • relation – The relation between the shape and the case property values.


A filter definition


Only return docs missing a value for field

corehq.apps.es.filters.nested(path, filter_)[source]

Query nested documents which normally can’t be queried directly


Only return docs with a real, non-null value for field

corehq.apps.es.filters.range_filter(field, gt=None, gte=None, lt=None, lte=None)[source]

Filter field by a range. Pass in some sensible combination of gt (greater than), gte (greater than or equal to), lt, and lte.

corehq.apps.es.filters.term(field, value)[source]

Filter docs by a field ‘value’ can be a singleton or a list.

Available Queries

Queries are used for actual searching - things like relevancy scores, Levenstein distance, and partial matches.

View the elasticsearch documentation to see what other options are available, and put ‘em here if you end up using any of ‘em.

corehq.apps.es.queries.filtered(query, filter_)[source]

Filtered query for performing both filtering and querying at once

corehq.apps.es.queries.geo_distance(field, geopoint, **kwargs)[source]

Filters cases to those within a certain distance of the provided geopoint

eg: geo_distance(‘gps_location’, GeoPoint(-33.1, 151.8), kilometers=100)


No-op query used because a default must be specified

corehq.apps.es.queries.nested(path, query, *args, **kwargs)[source]

Creates a nested query for use with nested documents

Keyword arguments such as score_mode and others can be added.

corehq.apps.es.queries.nested_filter(path, filter_, *args, **kwargs)[source]

Creates a nested query for use with nested documents

Keyword arguments such as score_mode and others can be added.

corehq.apps.es.queries.search_string_query(search_string, default_fields)[source]

All input defaults to doing an infix search for each term. (This may later change to some kind of fuzzy matching).

This is also available via the main ESQuery class.

Aggregate Queries

Aggregations are a replacement for Facets

Here is an example used to calculate how many new pregnancy cases each user has opened in a certain date range.

res = (CaseES()
       .date_range('opened_on', gte=startdate, lte=enddate))
       .aggregation(TermsAggregation('by_user', 'opened_by')

buckets = res.aggregations.by_user.buckets

There’s a bit of magic happening here - you can access the raw json data from this aggregation via res.aggregation('by_user') if you’d prefer to skip it.

The res object has a aggregations property, which returns a namedtuple pointing to the wrapped aggregation results. The name provided at instantiation is used here (by_user in this example).

The wrapped aggregation_result object has a result property containing the aggregation data, as well as utilties for parsing that data into something more useful. For example, the TermsAggregation result also has a counts_by_bucket method that returns a {bucket: count} dictionary, which is normally what you want.

As of this writing, there’s not much else developed, but it’s pretty easy to add support for other aggregation types and more results processing

class corehq.apps.es.aggregations.AggregationRange(start=None, end=None, key=None)[source]

Note that a range includes the “start” value and excludes the “end” value. i.e. start <= X < end

  • start – range start

  • end – range end

  • key – optional key name for the range

class corehq.apps.es.aggregations.AggregationTerm(name, field)

Alias for field number 1


Alias for field number 0

class corehq.apps.es.aggregations.AvgAggregation(name, field)[source]
class corehq.apps.es.aggregations.CardinalityAggregation(name, field)[source]
class corehq.apps.es.aggregations.DateHistogram(name, datefield, interval, timezone=None)[source]

Aggregate by date range. This can answer questions like “how many forms were created each day?”.

  • name – what do you want to call this aggregation

  • datefield – the document’s date field to look at

  • interval – the date interval to use - from DateHistogram.Interval

  • timezone – do bucketing using this time zone instead of UTC

__init__(name, datefield, interval, timezone=None)[source]
class corehq.apps.es.aggregations.ExtendedStatsAggregation(name, field, script=None)[source]

Extended stats aggregation that computes an extended stats aggregation by field

class corehq.apps.es.aggregations.FilterAggregation(name, filter)[source]

Bucket aggregation that creates a single bucket for the specified filter

  • name – aggregation name

  • filter – filter body

__init__(name, filter)[source]
class corehq.apps.es.aggregations.FiltersAggregation(name, filters=None)[source]

Bucket aggregation that creates a bucket for each filter specified using the filter name.


name – aggregation name

__init__(name, filters=None)[source]
add_filter(name, filter)[source]
  • name – filter name

  • filter – filter body

class corehq.apps.es.aggregations.GeoBoundsAggregation(name, field)[source]

A metric aggregation that computes the bounding box containing all geo_point values for a field.

More info: Geo Bounds Aggregation

__init__(name, field)[source]
class corehq.apps.es.aggregations.GeohashGridAggregation(name, field, precision)[source]

A multi-bucket aggregation that groups geo_point and geo_shape values into buckets that represent a grid.

More info: Geohash grid aggregation

__init__(name, field, precision)[source]

Initialize a GeohashGridAggregation

  • name – The name of this aggregation

  • field – The case property that stores a geopoint

  • precision – A value between 1 and 12

High precision geohashes have a long string length and represent cells that cover only a small area (similar to long-format ZIP codes like “02139-4075”).

Low precision geohashes have a short string length and represent cells that each cover a large area (similar to short-format ZIP codes like “02139”).

class corehq.apps.es.aggregations.MaxAggregation(name, field)[source]
class corehq.apps.es.aggregations.MinAggregation(name, field)[source]

Bucket aggregation that returns the minumum value of a field

  • name – aggregation name

  • field – name of the field to min

class corehq.apps.es.aggregations.MissingAggregation(name, field)[source]

A field data based single bucket aggregation, that creates a bucket of all documents in the current document set context that are missing a field value (effectively, missing a field or having the configured NULL value set).

  • name – aggregation name

  • field – name of the field to bucket on

__init__(name, field)[source]
class corehq.apps.es.aggregations.NestedAggregation(name, path)[source]

A special single bucket aggregation that enables aggregating nested documents.


path – Path to nested document

__init__(name, path)[source]
class corehq.apps.es.aggregations.NestedTermAggregationsHelper(base_query, terms)[source]

Helper to run nested term-based queries (equivalent to SQL group-by clauses). This is not at all related to the ES ‘nested aggregation’. The final aggregation is a count of documents.

Example usage:

# counting all forms submitted in a domain grouped by app id and user id

        AggregationTerm('app_id', 'app_id'),
        AggregationTerm('user_id', 'form.meta.userID'),

This works by bucketing docs first by one terms aggregation, then within that bucket, bucketing further by the next term, and so on. This is then flattened out to appear like a group-by-multiple.

__init__(base_query, terms)[source]
class corehq.apps.es.aggregations.RangeAggregation(name, field, ranges=None, keyed=True)[source]

Bucket aggregation that creates one bucket for each range :param name: the aggregation name :param field: the field to perform the range aggregations on :param ranges: list of AggregationRange objects :param keyed: set to True to have the results returned by key instead of as a list (see RangeResult.normalized_buckets)

__init__(name, field, ranges=None, keyed=True)[source]
class corehq.apps.es.aggregations.StatsAggregation(name, field, script=None)[source]

Stats aggregation that computes a stats aggregation by field

  • name – aggregation name

  • field – name of the field to collect stats on

  • script – an optional field to allow you to script the computed field

__init__(name, field, script=None)[source]
class corehq.apps.es.aggregations.SumAggregation(name, field)[source]

Bucket aggregation that sums a field

  • name – aggregation name

  • field – name of the field to sum

__init__(name, field)[source]
class corehq.apps.es.aggregations.TermsAggregation(name, field, size=None, missing=None)[source]

Bucket aggregation that aggregates by field

  • name – aggregation name

  • field – name of the field to bucket on

  • size

  • missing – define how documents that are missing a value should be treated. By default, they will be ignored. If a value is supplied here it will be used where the value is missing.

__init__(name, field, size=None, missing=None)[source]
class corehq.apps.es.aggregations.TopHitsAggregation(name, field=None, is_ascending=True, size=1, include=None)[source]

A top_hits metric aggregator keeps track of the most relevant document being aggregated This aggregator is intended to be used as a sub aggregator, so that the top matching documents can be aggregated per bucket.

  • name – Aggregation name

  • field – This is the field to sort the top hits by. If None, defaults to sorting by score.

  • is_ascending – Whether to sort the hits in ascending or descending order.

  • size – The number of hits to include. Defaults to 1.

  • include – An array of fields to include in the hit. Defaults to returning the whole document.

__init__(name, field=None, is_ascending=True, size=1, include=None)[source]
class corehq.apps.es.aggregations.ValueCountAggregation(name, field)[source]


class corehq.apps.es.apps.AppES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

index = 'apps'
class corehq.apps.es.apps.ElasticApp(index_name, type_)[source]
canonical_name = 'apps'
property mapping
property model_cls
settings_key = 'hqapps'


Here’s an example adapted from the case list report - it gets a list of the ids of all unknown users, web users, and demo users on a domain.

from corehq.apps.es import users as user_es

user_filters = [

query = (user_es.UserES()

owner_ids = query.get_ids()
class corehq.apps.es.users.ElasticUser(index_name, type_)[source]
canonical_name = 'users'
property mapping
property model_cls
settings_key = 'hqusers'
class corehq.apps.es.users.UserES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

default_filters = {'active': {'term': {'is_active': True}}, 'not_deleted': {'term': {'base_doc': 'couchuser'}}}
index = 'users'

Include inactive users, which would normally be filtered out.


Return only AdminUsers. Admin users are mock users created from xform submissions with unknown user ids whose username is “admin”.

corehq.apps.es.users.created(gt=None, gte=None, lt=None, lte=None)[source]

Matches users whose username is demo_user

corehq.apps.es.users.domain(domain, allow_enterprise=False)[source]
corehq.apps.es.users.last_logged_in(gt=None, gte=None, lt=None, lte=None)[source]

A user_data property doesn’t exist, or does exist but has an empty string value.


Return only UnknownUsers. Unknown users are mock users created from xform submissions with unknown user ids.

corehq.apps.es.users.user_data(key, value)[source]


Here’s an example getting pregnancy cases that are either still open or were closed after May 1st.

from corehq.apps.es import cases as case_es

q = (case_es.CaseES()
         case_es.closed_range(gte=datetime.date(2015, 05, 01))))
class corehq.apps.es.cases.CaseES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

index = 'cases'
class corehq.apps.es.cases.ElasticCase(index_name, type_)[source]
canonical_name = 'cases'
property mapping
property model_cls
settings_key = 'hqcases'
corehq.apps.es.cases.active_in_range(gt=None, gte=None, lt=None, lte=None)[source]

Restricts cases returned to those with actions during the range

corehq.apps.es.cases.closed_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.modified_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.open_case_aggregation(name='open_case', gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.opened_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.server_modified_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.touched_total_aggregation(gt=None, gte=None, lt=None, lte=None)[source]


class corehq.apps.es.forms.ElasticForm(index_name, type_)[source]
canonical_name = 'forms'
property mapping
property model_cls
settings_key = 'xforms'
class corehq.apps.es.forms.FormES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

default_filters = {'has_domain': {'exists': {'field': 'domain'}}, 'has_user': {'exists': {'field': 'form.meta.userID'}}, 'has_xmlns': {'exists': {'field': 'xmlns'}}, 'is_xform_instance': {'term': {'doc_type': 'xforminstance'}}}
index = 'forms'

Include only archived forms, which are normally excluded

corehq.apps.es.forms.completed(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.forms.submitted(gt=None, gte=None, lt=None, lte=None)[source]

return only those forms that have case blocks that touch the cases listed in case_ids



from corehq.apps.es import DomainES

query = (DomainES()
         .created(gte=datespan.startdate, lte=datespan.enddate)
class corehq.apps.es.domains.DomainES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

default_filters = {'not_snapshot': {'bool': {'must_not': {'term': {'is_snapshot': True}}}}}
index = 'domains'

Normally snapshots are excluded, instead, return only snapshots

class corehq.apps.es.domains.ElasticDomain(index_name, type_)[source]
analysis = {'analyzer': {'comma': {'pattern': '\\s*,\\s*', 'type': 'pattern'}, 'default': {'filter': ['lowercase'], 'tokenizer': 'whitespace', 'type': 'custom'}}}
canonical_name = 'domains'
property mapping
property model_cls
settings_key = 'hqdomains'
corehq.apps.es.domains.created(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.domains.last_modified(gt=None, gte=None, lt=None, lte=None)[source]


class corehq.apps.es.sms.ElasticSMS(index_name, type_)[source]
canonical_name = 'sms'
property mapping
property model_cls
settings_key = 'smslogs'
class corehq.apps.es.sms.SMSES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

index = 'sms'
corehq.apps.es.sms.received(gt=None, gte=None, lt=None, lte=None)[source]