ESQuery

ESQuery

ESQuery is a library for building elasticsearch queries in a friendly, more readable manner.

Basic usage

There should be a file and subclass of ESQuery for each index we have.

Each method returns a new object, so you can chain calls together like SQLAlchemy. Here’s an example usage:

q = (FormsES()
     .domain(self.domain)
     .xmlns(self.xmlns)
     .submitted(gte=self.datespan.startdate_param,
                lt=self.datespan.enddateparam)
     .fields(['xmlns', 'domain', 'app_id'])
     .sort('received_on', desc=False)
     .size(self.pagination.count)
     .start(self.pagination.start)
     .terms_aggregation('babies.count', 'babies_saved'))
result = q.run()
total_docs = result.total
hits = result.hits

Generally useful filters and queries should be abstracted away for re-use, but you can always add your own like so:

q.filter({"some_arbitrary_filter": {...}})
q.set_query({"fancy_query": {...}})

For debugging or more helpful error messages, you can use query.dumps() and query.pprint(), both of which use json.dumps() and are suitable for pasting in to ES Head or Marvel or whatever

Filtering

Filters are implemented as standalone functions, so they can be composed and nested q.OR(web_users(), mobile_users()). Filters can be passed to the query.filter method: q.filter(web_users())

There is some syntactic sugar that lets you skip this boilerplate and just call the filter as if it were a method on the query class: q.web_users() In order to be available for this shorthand, filters are added to the builtin_filters property of the main query class. I know that’s a bit confusing, but it seemed like the best way to make filters available in both contexts.

Generic filters applicable to all indices are available in corehq.apps.es.filters. (But most/all can also be accessed as a query method, if appropriate)

Filtering Specific Indices

There is a file for each elasticsearch index (if not, feel free to add one). This file provides filters specific to that index, as well as an appropriately-directed ESQuery subclass with references to these filters.

These index-specific query classes also have default filters to exclude things like inactive users or deleted docs. These things should nearly always be excluded, but if necessary, you can remove these with remove_default_filters.

Running against production

Since the ESQuery library is read-only, it’s mostly safe to run against production. You can define alternate elasticsearch hosts in your localsettings file in the ELASTICSEARCH_DEBUG_HOSTS dictionary and pass in this host name as the debug_host to the constructor:

>>> CaseES(debug_host='prod').domain('dimagi').count()
120

Language

  • es_query - the entire query, filters, query, pagination
  • filters - a list of the individual filters
  • query - the query, used for searching, not filtering
  • field - a field on the document. User docs have a ‘domain’ field.
  • lt/gt - less/greater than
  • lte/gte - less/greater than or equal to
class corehq.apps.es.es_query.ESQuery(index=None, debug_host=None, es_instance_alias='default')[source]

This query builder only outputs the following query structure:

{
    "query": {
        "filtered": {
            "filter": {
                "and": [
                    <filters>
                ]
            },
            "query": <query>
        }
    },
    <size, sort, other params>
}
aggregation(aggregation)[source]

Add the passed-in aggregation to the query

builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

count()[source]

Performs a minimal query to get the count of matching documents

dumps(pretty=False)[source]

Returns the JSON query that will be sent to elasticsearch.

exclude_source()[source]

Turn off _source retrieval. Mostly useful if you just want the doc_ids

fields(fields)[source]

Restrict the fields returned from elasticsearch

Deprecated. Use source instead.

filter(filter)[source]

Add the passed-in filter to the query. All filtering goes through this class.

filters

Return a list of the filters used in this query, suitable if you want to reproduce a query with additional filtering.

get_ids()[source]

Performs a minimal query to get the ids of the matching documents

pprint()[source]

pretty prints the JSON query that will be sent to elasticsearch.

remove_default_filter(default)[source]

Remove a specific default filter by passing in its name.

remove_default_filters()[source]

Sensible defaults are provided. Use this if you don’t want ‘em

run(include_hits=False)[source]

Actually run the query. Returns an ESQuerySet object.

scroll()[source]

Run the query against the scroll api. Returns an iterator yielding each document that matches the query.

search_string_query(search_string, default_fields=None)[source]

Accepts a user-defined search string

set_query(query)[source]

Set the query. Most stuff we want is better done with filters, but if you actually want Levenshtein distance or prefix querying...

set_sorting_block(sorting_block)[source]

To be used with get_sorting_block, which interprets datatables sorting

size(size)[source]

Restrict number of results returned. Analagous to SQL limit.

sort(field, desc=False, reset_sort=True)[source]

Order the results by field.

source(include, exclude=None)[source]

Restrict the output of _source in the queryset. This can be used to return an object in a queryset

start(start)[source]

Pagination. Analagous to SQL offset.

values(*fields)[source]

modeled after django’s QuerySet.values

class corehq.apps.es.es_query.ESQuerySet(raw, query)[source]
The object returned from ESQuery.run
  • ESQuerySet.raw is the raw response from elasticsearch
  • ESQuerySet.query is the ESQuery object
doc_ids

Return just the docs ids from the response.

hits

Return the docs from the response.

static normalize_result(query, result)[source]

Return the doc from an item in the query response.

total

Return the total number of docs matching the query.

class corehq.apps.es.es_query.HQESQuery(index=None, debug_host=None, es_instance_alias='default')[source]

Query logic specific to CommCareHQ

Available Filters

The following filters are available on any ESQuery instance - you can chain any of these on your query.

Note also that the term filter accepts either a list or a single element. Simple filters which match against a field are based on this filter, so those will also accept lists. That means you can do form_query.xmlns(XMLNS1) or form_query.xmlns([XMLNS1, XMLNS2, ...]).

Contributing: Additions to this file should be added to the builtin_filters method on either ESQuery or HQESQuery, as appropriate (is it an HQ thing?).

corehq.apps.es.filters.AND(*filters)[source]

Filter docs to match all of the filters passed in

corehq.apps.es.filters.NOT(filter_)[source]

Exclude docs matching the filter passed in

corehq.apps.es.filters.OR(*filters)[source]

Filter docs to match any of the filters passed in

corehq.apps.es.filters.date_range(field, gt=None, gte=None, lt=None, lte=None)[source]

Range filter that accepts datetime objects as arguments

corehq.apps.es.filters.doc_id(doc_id)[source]

Filter by doc_id. Also accepts a list of doc ids

corehq.apps.es.filters.doc_type(doc_type)[source]

Filter by doc_type. Also accepts a list

corehq.apps.es.filters.domain(domain_name)[source]

Filter by domain.

corehq.apps.es.filters.empty(field)[source]

Only return docs with a missing or null value for field

corehq.apps.es.filters.exists(field)[source]

Only return docs which have a value for field

corehq.apps.es.filters.missing(field, exist=True, null=True)[source]

Only return docs missing a value for field

corehq.apps.es.filters.nested(path, filter_)[source]

Query nested documents which normally can’t be queried directly

corehq.apps.es.filters.non_null(field)[source]

Only return docs with a real, non-null value for field

corehq.apps.es.filters.range_filter(field, gt=None, gte=None, lt=None, lte=None)[source]

Filter field by a range. Pass in some sensible combination of gt (greater than), gte (greater than or equal to), lt, and lte.

corehq.apps.es.filters.term(field, value)[source]

Filter docs by a field ‘value’ can be a singleton or a list.

Available Queries

Queries are used for actual searching - things like relevancy scores, Levenstein distance, and partial matches.

View the elasticsearch documentation to see what other options are available, and put ‘em here if you end up using any of ‘em.

corehq.apps.es.queries.filtered(query, filter_)[source]

Filtered query for performing both filtering and querying at once

corehq.apps.es.queries.match_all()[source]

No-op query used because a default must be specified

corehq.apps.es.queries.nested(path, query, *args, **kwargs)[source]

Creates a nested query for use with nested documents

Keyword arguments such as score_mode and others can be added.

corehq.apps.es.queries.nested_filter(path, filter_, *args, **kwargs)[source]

Creates a nested query for use with nested documents

Keyword arguments such as score_mode and others can be added.

corehq.apps.es.queries.search_string_query(search_string, default_fields=None)[source]

Allows users to use advanced query syntax, but if search_string does not use the ES query string syntax, default to doing an infix search for each term. (This may later change to some kind of fuzzy matching).

This is also available via the main ESQuery class.

Aggregate Queries

Aggregations are a replacement for Facets

Here is an example used to calculate how many new pregnancy cases each user has opened in a certain date range.

res = (CaseES()
       .domain(self.domain)
       .case_type('pregnancy')
       .date_range('opened_on', gte=startdate, lte=enddate))
       .aggregation(TermsAggregation('by_user', 'opened_by')
       .size(0)

buckets = res.aggregations.by_user.buckets
buckets.user1.doc_count

There’s a bit of magic happening here - you can access the raw json data from this aggregation via res.aggregation('by_user') if you’d prefer to skip it.

The res object has a aggregations property, which returns a namedtuple pointing to the wrapped aggregation results. The name provided at instantiation is used here (by_user in this example).

The wrapped aggregation_result object has a result property containing the aggregation data, as well as utilties for parsing that data into something more useful. For example, the TermsAggregation result also has a counts_by_bucket method that returns a {bucket: count} dictionary, which is normally what you want.

As of this writing, there’s not much else developed, but it’s pretty easy to add support for other aggregation types and more results processing

class corehq.apps.es.aggregations.AggregationRange[source]

Note that a range includes the “start” value and excludes the “end” value. i.e. start <= X < end

Parameters:
  • start – range start
  • end – range end
  • key – optional key name for the range
class corehq.apps.es.aggregations.AggregationTerm(name, field)
field

Alias for field number 1

name

Alias for field number 0

class corehq.apps.es.aggregations.DateHistogram(name, datefield, interval, timezone=None)[source]

Aggregate by date range. This can answer questions like “how many forms were created each day?”.

This class can be instantiated by the ESQuery.date_histogram method.

Parameters:
  • name – what do you want to call this aggregation
  • datefield – the document’s date field to look at
  • interval – the date interval to use: “year”, “quarter”, “month”, “week”, “day”, “hour”, “minute”, “second”
  • timezone – do bucketing using this time zone instead of UTC
class corehq.apps.es.aggregations.ExtendedStatsAggregation(name, field, script=None)[source]

Extended stats aggregation that computes an extended stats aggregation by field

class corehq.apps.es.aggregations.FilterAggregation(name, filter)[source]

Bucket aggregation that creates a single bucket for the specified filter

Parameters:
  • name – aggregation name
  • filter – filter body
class corehq.apps.es.aggregations.FiltersAggregation(name, filters=None)[source]

Bucket aggregation that creates a bucket for each filter specified using the filter name.

Parameters:name – aggregation name
add_filter(name, filter)[source]
Parameters:
  • name – filter name
  • filter – filter body
class corehq.apps.es.aggregations.MinAggregation(name, field)[source]

Bucket aggregation that returns the minumum value of a field

Parameters:
  • name – aggregation name
  • field – name of the field to min
class corehq.apps.es.aggregations.MissingAggregation(name, field)[source]

A field data based single bucket aggregation, that creates a bucket of all documents in the current document set context that are missing a field value (effectively, missing a field or having the configured NULL value set).

Parameters:
  • name – aggregation name
  • field – name of the field to bucket on
class corehq.apps.es.aggregations.NestedAggregation(name, path)[source]

A special single bucket aggregation that enables aggregating nested documents.

Parameters:path – Path to nested document
class corehq.apps.es.aggregations.NestedTermAggregationsHelper(base_query, terms, inner_most_aggregation=None)[source]

Helper to run nested term-based queries (equivalent to SQL group-by clauses). This is not at all related to the ES ‘nested aggregation’. The final aggregation defaults to a count of documents, though can also be used to sum a different field of the document.

Example usage:

# counting all forms submitted in a domain grouped by app id and user id NestedTermAggregationsHelper(

base_query=FormES().domain(domain_name), terms=[

AggregationTerm(‘app_id’, ‘app_id’), AggregationTerm(‘user_id’, ‘form.meta.userID’),

]

).get_data()

# summing the balances of ledger values, grouped by the entry id NestedTermAggregationsHelper(

base_query=LedgerES().domain(domain).section(section_id), terms=[

AggregationTerm(‘entry_id’, ‘entry_id’),

], inner_most_aggregation=SumAggregation(‘balance’, ‘balance’),

).get_data()

class corehq.apps.es.aggregations.RangeAggregation(name, field, ranges=None, keyed=True)[source]

Bucket aggregation that creates one bucket for each range :param name: the aggregation name :param field: the field to perform the range aggregations on :param ranges: list of AggregationRange objects :param keyed: set to True to have the results returned by key instead of as a list

(see RangeResult.normalized_buckets)
class corehq.apps.es.aggregations.StatsAggregation(name, field, script=None)[source]

Stats aggregation that computes a stats aggregation by field

Parameters:
  • name – aggregation name
  • field – name of the field to collect stats on
  • script – an optional field to allow you to script the computed field
class corehq.apps.es.aggregations.SumAggregation(name, field)[source]

Bucket aggregation that sums a field

Parameters:
  • name – aggregation name
  • field – name of the field to sum
class corehq.apps.es.aggregations.TermsAggregation(name, field, size=None)[source]

Bucket aggregation that aggregates by field

Parameters:
  • name – aggregation name
  • field – name of the field to bucket on
  • size
class corehq.apps.es.aggregations.TopHitsAggregation(name, field=None, is_ascending=True, size=1, include=None)[source]

A top_hits metric aggregator keeps track of the most relevant document being aggregated This aggregator is intended to be used as a sub aggregator, so that the top matching documents can be aggregated per bucket.

Parameters:
  • name – Aggregation name
  • field – This is the field to sort the top hits by. If None, defaults to sorting by score.
  • is_ascending – Whether to sort the hits in ascending or descending order.
  • size – The number of hits to include. Defaults to 1.
  • include – An array of fields to include in the hit. Defaults to returning the whole document.

AppES

class corehq.apps.es.apps.AppES(index=None, debug_host=None, es_instance_alias='default')[source]
builtin_filters
index = 'apps'
corehq.apps.es.apps.cloudcare_enabled(cloudcare_enabled)[source]
corehq.apps.es.apps.created_from_template(from_template=True)[source]
corehq.apps.es.apps.is_build(build=True)[source]
corehq.apps.es.apps.is_released(released=True)[source]
corehq.apps.es.apps.uses_case_sharing(case_sharing=True)[source]

UserES

Here’s an example adapted from the case list report - it gets a list of the ids of all unknown users, web users, and demo users on a domain.

from corehq.apps.es import users as user_es

user_filters = [
    user_es.unknown_users(),
    user_es.web_users(),
    user_es.demo_users(),
]

query = (user_es.UserES()
         .domain(self.domain)
         .OR(*user_filters)
         .show_inactive())

owner_ids = query.get_ids()
class corehq.apps.es.users.UserES(index=None, debug_host=None, es_instance_alias='default')[source]
builtin_filters
default_filters = {'active': {'term': {'is_active': True}}, 'not_deleted': {'term': {'base_doc': 'couchuser'}}}
index = 'users'
show_inactive()[source]

Include inactive users, which would normally be filtered out.

show_only_inactive()[source]
corehq.apps.es.users.admin_users()[source]

Return only AdminUsers. Admin users are mock users created from xform submissions with unknown user ids whose username is “admin”.

corehq.apps.es.users.analytics_enabled(enabled=True)[source]
corehq.apps.es.users.created(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.users.demo_users()[source]

Matches users whose username is demo_user

corehq.apps.es.users.domain(domain)[source]
corehq.apps.es.users.is_practice_user(practice_mode=True)[source]
corehq.apps.es.users.last_logged_in(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.users.location(location_id)[source]
corehq.apps.es.users.mobile_users()[source]
corehq.apps.es.users.primary_location(location_id)[source]
corehq.apps.es.users.unknown_users()[source]

Return only UnknownUsers. Unknown users are mock users created from xform submissions with unknown user ids.

corehq.apps.es.users.user_ids(user_ids)[source]
corehq.apps.es.users.username(username)[source]
corehq.apps.es.users.web_users()[source]

CaseES

Here’s an example getting pregnancy cases that are either still open or were closed after May 1st.

from corehq.apps.es import cases as case_es

q = (case_es.CaseES()
     .domain('testproject')
     .case_type('pregnancy')
     .OR(case_es.is_closed(False),
         case_es.closed_range(gte=datetime.date(2015, 05, 01))))
class corehq.apps.es.cases.CaseES(index=None, debug_host=None, es_instance_alias='default')[source]
builtin_filters
index = 'cases'
corehq.apps.es.cases.active_in_range(gt=None, gte=None, lt=None, lte=None)[source]

Restricts cases returned to those with actions during the range

corehq.apps.es.cases.case_ids(case_ids)[source]
corehq.apps.es.cases.case_type(type_)[source]
corehq.apps.es.cases.closed_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.is_closed(closed=True)[source]
corehq.apps.es.cases.modified_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.open_case_aggregation(name='open_case', gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.opened_by(user_id)[source]
corehq.apps.es.cases.opened_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.owner(owner_id)[source]
corehq.apps.es.cases.owner_type(owner_type)[source]
corehq.apps.es.cases.server_modified_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.touched_total_aggregation(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.user(user_id)[source]
corehq.apps.es.cases.user_ids_handle_unknown(user_ids)[source]

FormES

class corehq.apps.es.forms.FormES(index=None, debug_host=None, es_instance_alias='default')[source]
builtin_filters
completed_histogram(timezone=None)[source]
default_filters = {'has_domain': {'not': {'missing': {'field': 'domain'}}}, 'is_xform_instance': {'term': {'doc_type': 'xforminstance'}}, 'has_xmlns': {'not': {'missing': {'field': 'xmlns'}}}, 'has_user': {'not': {'missing': {'field': 'form.meta.userID'}}}}
domain_aggregation()[source]
index = 'forms'
only_archived()[source]

Include only archived forms, which are normally excluded

submitted_histogram(timezone=None)[source]
user_aggregation()[source]
corehq.apps.es.forms.app(app_ids)[source]
corehq.apps.es.forms.completed(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.forms.j2me_submissions(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.forms.submitted(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.forms.user_id(user_ids)[source]
corehq.apps.es.forms.user_ids_handle_unknown(user_ids)[source]
corehq.apps.es.forms.user_type(user_types)[source]
corehq.apps.es.forms.xmlns(xmlnss)[source]

DomainES

Here’s an example generating a histogram of domain creations (that’s a type of faceted query), filtered by a provided list of domains and a report date range.

from corehq.apps.es import DomainES

domains_after_date = (DomainES()
                      .in_domains(domains)
                      .created(gte=datespan.startdate, lte=datespan.enddate)
                      .date_histogram('date', 'date_created', interval)
                      .size(0))
histo_data = domains_after_date.run().aggregations.date.buckets_list
class corehq.apps.es.domains.DomainES(index=None, debug_host=None, es_instance_alias='default')[source]
builtin_filters
default_filters = {'not_snapshot': {'not': {'term': {'is_snapshot': True}}}}
index = 'domains'
only_snapshots()[source]

Normally snapshots are excluded, instead, return only snapshots

corehq.apps.es.domains.commcare_domains()[source]
corehq.apps.es.domains.commconnect_domains()[source]
corehq.apps.es.domains.commtrack_domains()[source]
corehq.apps.es.domains.created(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.domains.created_by_user(creating_user)[source]
corehq.apps.es.domains.in_domains(domains)[source]
corehq.apps.es.domains.incomplete_domains()[source]
corehq.apps.es.domains.is_active(is_active=True)[source]
corehq.apps.es.domains.is_active_project(is_active=True)[source]
corehq.apps.es.domains.last_modified(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.domains.non_test_domains()[source]
corehq.apps.es.domains.real_domains()[source]

SMSES

class corehq.apps.es.sms.SMSES(index=None, debug_host=None, es_instance_alias='default')[source]
builtin_filters
index = 'sms'
user_aggregation()[source]
corehq.apps.es.sms.direction(direction_)[source]
corehq.apps.es.sms.incoming_messages()[source]
corehq.apps.es.sms.outgoing_messages()[source]
corehq.apps.es.sms.processed(processed=True)[source]
corehq.apps.es.sms.processed_or_incoming_messages()[source]
corehq.apps.es.sms.received(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.sms.to_commcare_case()[source]
corehq.apps.es.sms.to_commcare_user()[source]
corehq.apps.es.sms.to_commcare_user_or_case()[source]
corehq.apps.es.sms.to_couch_user()[source]
corehq.apps.es.sms.to_web_user()[source]