Elasticsearch

Overview

Indexes

We have indexes for each of the following doc types:
  • Applications - hqapps

  • Cases - hqcases

  • Domains - hqdomains

  • Forms - xforms

  • Groups - hqgroups

  • Users - hqusers

  • SMS logs - smslogs

  • Case Search - case_search

Each index has a corresponding mapping file in corehq/pillows/mappings/. Each mapping has a hash that reflects the current state of the mapping. This can just be a random alphanumeric string. The hash is appended to the index name so the index is called something like xforms_1cce1f049a1b4d864c9c25dc42648a45. Each type of index has an alias with the short name, so you should normally be querying just xforms, not the fully specified index+hash. All of HQ code except the index maintenance code uses aliases to read and write data to indices.

Whenever the mapping is changed, this hash should be updated. That will trigger the creation of a new index on deploy (by the $ ./manage.py ptop_preindex command). Once the new index is finished, the alias is flipped ($ ./manage.py ptop_es_manage --flip_all_aliases) to point to the new index, allowing for a relatively seamless transition.

Keeping indexes up-to-date

Pillowtop looks at the changes feed from couch and listens for any relevant new/changed docs. In order to have your changes appear in elasticsearch, pillowtop must be running:

$ ./manage.py run_ptop --all

You can also run a once-off reindex for a specific index:

$ ./manage.py ptop_reindexer_v2 user

Changing a mapping or adding data

If you’re adding additional data to elasticsearch, you’ll need modify that index’s mapping file in order to be able to query on that new data.

Adding data to an index

Each pillow has a function or class that takes in the raw document dictionary and transforms it into the document that get’s sent to ES. If for example, you wanted to store username in addition to user_id on cases in elastic, you’d add username to corehq.pillows.mappings.case_mapping, then modify transform_case_for_elasticsearch function to do the appropriate lookup. It accepts a doc_dict for the case doc and is expected to return a doc_dict, so just add the username to that.

Building the new index

Once you’ve made the change, you’ll need to build a new index which uses that new mapping. Updating index name in the mapping file triggers HQ to create the new index with new mapping and reindex all data, so you’ll have to update the index hash and alias at the top of the mapping file. The hash suffix to the index can just be a random alphanumeric string and is usually the date of the edit by convention. The alias should also be updated to a new one of format xforms_<date-modified> (the date is just by convention), so that production operations continue to use the old alias pointing to existing index. This will trigger a preindex as outlined in the Indexes section. In subsequent commits alias can be flipped back to what it was, for example xforms. Changing the alias name doesn’t trigger a reindex.

Updating indexes in a production environment

Updates in a production environment should be done in two steps, so to not show incomplete data.

  1. Setup a release of your branch using cchq <env> setup_limited_release:keep_days=n_days

  2. In your release directory, kick off a index using ./mange.py ptop_preindex

  3. Verify that the reindex has completed successfully - This is a weak point in our current migration process - This can be done by using ES head or the ES APIs to compare document counts to the previous index. - You should also actively look for errors in the ptop_preindex command that was ran

  4. Merge your PR and deploy your latest master branch.

How to un-bork your broken indexes

Sometimes things get in a weird state and (locally!) it’s easiest to just blow away the index and start over.

  1. Delete the affected index. The easiest way to do this is with elasticsearch-head. You can delete multiple affected indices with curl -X DELETE http://localhost:9200/*. * can be replaced with any regex to delete matched indices, similar to bash regex.

  2. Run $ ./manage.py ptop_preindex && ./manage.py ptop_es_manage --flip_all_aliases.

  3. Try again

Querying Elasticsearch - Best Practices

Here are the most basic things to know if you want to write readable and reasonably performant code for accessing Elasticsearch.

Use ESQuery when possible

Check out Querying Elasticsearch

  • Prefer the cleaner .count(), .values(), .values_list(), etc. execution methods to the more low level .run().hits, .run().total, etc. With the latter easier to make mistakes and fall into anti-patterns and it’s harder to read.

  • Prefer adding filter methods to using set_query() unless you really know what you’re doing and are willing to make your code more error prone and difficult to read.

Prefer scroll queries

Use a scroll query when fetching lots of records.

Prefer filter to query

Don’t use query when you could use filter if you don’t need rank.

Use size(0) with aggregations

Use size(0) when you’re only doing aggregations thing—otherwise you’ll get back doc bodies as well! Sometimes that’s just abstractly wasteful, but often it can be a serious performance hit for the operation as well as the cluster.

The best way to do this is by using helpers like ESQuery’s .count() that know to do this for you—your code will look better and you won’t have to remember to check for that every time. (If you ever find helpers not doing this correctly, then it’s definitely worth fixing.)


Elasticsearch App

Elasticsearch Index Management

CommCare HQ data in Elasticsearch is integral to core application functionality. The level that the application relies on Elasticsearch data varies from index to index. Currently, Elasticsearch contains both authoritative data (for example @indexed_on case property and UnknownUser user records) and data used for real-time application logic (the users index, for example).

In order to guarantee stability (or “manageability”, if you will) of this core data, it is important that Elasticsearch indexes are maintained in a consistent state across all environments as a concrete design feature of CommCare HQ. This design constraint is accomplished by managing Elasticsearch index modifications (for example: creating indexes, updating index mappings, etc) exclusively through Django’s migration framework. This ensures that all Elasticsearch index modifications will be part of standard CommCare HQ code deployment procedures, thereby preventing Elasticsearch index state drift between maintained CommCare HQ deployments.

One or more migrations are required any time the following Elasticsearch state configurations are changed in code:

  • index names

  • index aliases

  • analyzers

  • mappings

  • tuning parameters

Elasticsearch allows changing an index’s number_of_replicas tuning parameter on a live index. In the future, the configuration settings (i.e. “live state”) of that value should be removed from the CommCare HQ codebase entirely in order to decouple it from application logic.

Creating Elasticsearch Index Migrations

Like Django Model migrations, Elasticsearch index migrations can be quite verbose. To aid in creating these migrations, there is a Django manage command that can generate migration files for Elasticsearch index operations. Since the Elasticsearch index state is not a Django model, Django’s model migration framework cannot automatically determine what operations need to be included in a migration, or even when a new migration is required. This is why creating these migrations is a separate command and not integrated into the default makemigrations command.

To create a new Elasticsearch index migration, use the make_elastic_migration management command and provide details for the required migration operations via any combination of the -c/--create, -u/--update and/or -d/--delete command line options.

Similar to Django model migrations, this management command uses the index metadata (mappings, analysis, etc) from the existing Elasticsearch code, so it is important that this command is executed after making changes to index metadata. To provide an example, consider a hypothetical scenario where the following index changes are needed:

  • create a new users index

  • update the mapping on the existing groups index to add a new property named pending_users

  • delete the existing index named groups-sandbox

After the new property has been added to the groups index mapping in code, the following management command would create a migration file (e.g. corehq/apps/es/migrations/0003_groups_pending_users.py) for the necessary operations:

./manage.py make_elastic_migration --name groups_pending_users -c users -u groups:pending_users -d groups-sandbox

Updating Elastic Index Mappings

Prior to the UpdateIndexMapping migration operation implementation, Elastic mappings were always applied “in full” any time a mapping change was needed. That is: the entire mapping (from code) was applied to the existing index via the Put Mapping API. This technique had some pros and cons:

  • Pro: the mapping update logic in code was simple because it did not have to worry about which existing mapping properties are persistent (persist on the index even if omitted in a PUT request payload) and which ones are volatile (effectively “unset” if omitted in a PUT request payload).

  • Con: it requires that all mapping properties are explicitly set on every mapping update, making mapping updates impossible if the existing index mapping in Elasticsearch has diverged from the mapping in code.

Because CommCare HQ Elastic mappings have been able to drift between environments, it is no longer possible to update some index mappings using the historical technique. On some indexes, the live index mappings have sufficiently diverged that there is no common, “full mapping definition” that can be applied on all environments. This means that in order to push mapping changes to all environments, new mapping update logic is needed which is capable of updating individual properties on an Elastic index mapping while leaving other (existing) properties unchanged.

The UpdateIndexMapping migration operation adds this capability. Due to the complex behavior of the Elasticsearch “Put Mapping” API, this implementation is limited to only support changing the mapping _meta and properties items. Changing other mapping properties (e.g. date_detection, dynamic, etc) is not yet implemented. However, the current implementation does ensure that the existing values are retained (unchanged). Historically, these values are rarely changed, so this limitation does not hinder any kind of routine maintenance operations. Implementing the ability to change the other properties will be a simple task when there is a clear definition of how that functionality needs to work, for example: when a future feature/change requires changing these properties for a specific reason.

Comparing Mappings In Code Against Live Indexes

When modifying mappings for an existing index, it can be useful to compare the new mapping (as defined in code) to the live index mappings in Elasticsearch on a CommCare HQ deployment. This is possible by dumping the mappings of interest into local files and comparing them with a diff utility. The print_elastic_mappings Django manage command makes this process relatively easy. Minimally, this can be accomplished in as few as three steps:

  1. Export the local code mapping into a new file.

  2. Export the mappings from a deployed environment into a local file.

  3. Compare the two files.

In practice, this might look like the following example:

./manage.py print_elastic_mappings sms --no-names > ./sms-in-code.py
cchq <env> django-manage print_elastic_mappings smslogs_2020-01-28:sms --no-names > ./sms-live.py
diff -u ./sms-live.py ./sms-in-code.py

Elastic Index Tuning Configurations

CommCare HQ provides a mechanism for individual deployments (environments) to tune the performance characteristics of their Elasticsearch indexes via Django settings. This mechanism can be used by defining an ES_SETTINGS dictionary in localsettings.py (or by configuring the requisite Elasticsearch parameters in a CommCare Cloud environment). Tuning parameters can be specified in one of two ways:

  1. “default”: configures the tuning settings for all indexes in the environment.

  2. index identifier: configures the tuning settings for a specific index in the environment – these settings take precedence over “default” settings.

For example, if an environment wishes to explicitly configure the “case_search” index with six shards, and all others with only three, the configuration could be specified in localsettings.py as:

ES_SETTINGS = {
    "default": {"number_of_shards": 3},
    "case_search": {"number_of_shards": 6},
}

Configuring a tuning setting with the special value None will result in that configuration item being reset to the Elasticsearch cluster default (unless superseded by another setting with higher precedence). Refer to corehq/app/es/index/settings.py file for the full details regarding what items (index and tunning settings values) are configurable, as well as what default tuning settings will be used when not customized by the environment.

Important note: These Elasticsearch index tuning settings are not “live”. That is: changing their values on a deployed environment will not have any immediate affect on live indexes in Elasticsearch. Instead, these values are only ever used when an index is created (for example, during a fresh CommCare HQ installation or when an existing index is reindexed into a new one). This means that making new values become “live” involves an index migration and reindex, which requires changes in the CommCare HQ codebase.

Adapter Design

The HQ Elastic adapter design came about due to the need for reindexing Elasticsearch indexes in a way that is transparent to parts of HQ that write to Elasticsearch (e.g. pillowtop). Reindexing is necessary for making changes to index mappings, is a prerequisite to upgrading an Elasticsearch cluster, and is also needed for changing low-level index configurations (e.g. sharding).

There is an existing procedure draft that documents the steps that were used on one occasion to reindex the case_search index. This procedure leveraged a custom pillow to “backfill” the cloned index (i.e. initially populated using Elasticsearch Reindex API). That procedure only works for a subset of HQ Elasticsearch indexes, and is too risky to be considered as an ongoing Elastic maintenance strategy. There are several key constraints that an HQ reindexing procedure should meet which the existing procedure does not:

  • simple and robust

  • performed with standard maintenance practices

  • provides the ability to test and verify the integrity of a new index before it is too late to be rejected

  • allows HQ Elasticsearch index state to remain decoupled from the commcare-cloud codebase

  • is not disruptive – does not prohibit any other kind of standard maintenance that might come up while the operation is underway

  • is “fire and forget” – does not require active polling of intermediate state in order to progress the overall operation

  • is practical for third party HQ hosters to use

One way to accomplish these constraints is to implement an “index multiplexing” feature in HQ, where Elasticsearch write operations are duplicated across two indexes. This design facilitates maintaining two up-to-date versions of any index (a primary read/write index and a secondary write-only index), allowing HQ to run in a “normal” state (i.e. not a custom “maintenance” state) while providing the ability to switch back and forth (swapping primary and secondary) before fully committing to abandoning one of them. Creating a copy of an index is the unavoidable nature of a reindex operation, and multiplexing allows safe switching from one to the other without causing disruptions or outages while keeping both up-to-date.

The least disruptive way to accomplish a multiplexing design is with an adapter layer that operates between the low-level third party Elasticsearch Python client library and high-level HQ components which need to read/write data in an Elasticsearch index. HQ already has the initial framework for this layer (the ElasticsearchInterface class), so the adapter layer is not a new concept. The reason that the ElasticsearchInterface implementation cannot be modified in-place to accommodate multiplexing is because it is the wrong level of abstraction. The ElasticsearchInterface abstraction layer was designed as an Elasticsearch version abstraction. It provides a common set of functions and methods so that the high-level HQ “consumer” that uses it can interact with Elasticsearch documents without knowing which Elasticsearch version is on the backend. It is below “index-level” logic, and does not implement index-specific functionality needed in order for some indexes to be handled differently than others (e.g. some indexes are indexed individually while others are multiplexed). The document adapter implementation is a document abstraction layer. It provides a common set of functions and methods to allow high-level HQ code to perform Elasticsearch operations at the document level, allowing unique adapters to handle their document operations differently from index to index.

With a multiplexing adapter layer, reindexing an Elasticsearch index can be as few as four concise steps, none of which are time-critical in respect to each other:

  1. Merge and deploy a PR that configures multiplexing on an index.

  2. Execute an idempotent management command that updates the secondary index from its primary counterpart.

  3. Merge and deploy a PR that disables multiplexing for the index, (now using only the new index).

  4. Execute a management command to delete the old index.

Note: the above steps are not limited to a single index at a time. That is, the implementation does not prohibit configuring multiplexing and reindexing multiple indexes at once.

This reindex procedure is inherently safe because:

  • At any point in the process, the rollback procedure is a simple code change (i.e. revert PR, deploy).

  • The operation responsible for populating the secondary index is idempotent and decoupled from the index configuration, allowing it to undergo change iterations without aborting the entire process (thereby losing reindex progress).

  • Instructions for third party hosters can follow the same process that Dimagi uses, which guarantees that any possible problems encountered by a third party hoster are not outside the Dimagi main track.

Design Details

Reindex Procedure Details

  1. Configure multiplexing on an index by passing in secondary index name to create_document_adapter.

    • Ensure that there is a migration in place for creating the index (see Creating Elasticsearch Index Migrations above).

    • (Optional) If the reindex involves other meta-index changes (shards, mappings, etc), also update those configurations at this time.

      Note Currently the Adapter will not support reindexing on specific environments but it would be compatible to accommodate it in future. This support will be added once we get to V5 of ES.

    • Configure create_document_adapter to return an instance of ElasticMultiplexAdapter by passing in secondary index name.

      case_adapter = create_document_adapter(
          ElasticCase,
          "hqcases_2016-03-04",
          "case",
          secondary="hqcase_2022-10-20"
      )
      
    • Add a migration which performs all cluster-level operations required for the new (secondary) index. For example:

      • creates the new index

      • configures shards, replicas, etc for the index

      • sets the index mapping

    • Review, merge and deploy this change. At Django startup, the new (secondary) index will automatically and immediately begin receiving document writes. Document reads will always come from the primary index.

  2. Execute a management command to sync and verify the secondary index from the primary.

    Note: This command is not yet implemented.

    This management command is idempotent and performs four operations in serial. If any of the operations complete with unexpected results, the command will abort with an error.

    1. Executes a Elastic reindex request with parameters to populate the secondary index from the primary, configured to not overwrite existing documents in the target (secondary) index.

    2. Polls the reindex task progress, blocking until complete.

      Note: the reindex API also supports a “blocking” mode which may be advantageous due to limitations in Elasticsearch 2.4’s Task API. As such, this step 2. might be removed in favor of a blocking reindex during the 2.4 –> 5.x upgrade.

    3. Performs a cleanup operation on the secondary index to remove tombstone documents.

    4. Performs a verification pass to check integrity of the secondary index.

      Note: An exact verification algorithm has not been designed, and complex verification operations may be left out of the first implementation. The reason it is outlined in this design is to identify that verification is supported and would happen at this point in the process. The initial implementation will at least implement feature-equivalency with the previous process (i.e. ensure document counts are equal between the two indexes), and tentatively an “equivalency check” of document _id’s (tentative because checking equality while the multiplexer is running is a race condition).

    Example command (not yet implemented):

    ./manage.py elastic_sync_multiplexed ElasticBook
    
  3. Perform a primary/secondary “swap” operation one or more times as desired to run a “live test” on the new (secondary) index while keeping the old (primary) index up-to-date.

    • Reconfigure the adapter by swapping the “primary” and “secondary” index names.

    • Add a migration that cleans up tombstone documents on the “new primary” index prior to startup.

    Note: In theory, this step can be optional (e.g. if the sync procedure becomes sufficiently trusted in the future, or for “goldilox” indexes where rebuilding from source is feasible but advantageous to avoid, etc).

  4. Disable multiplexing for the index.

    • Reconfigure the document adapter for the index by changing the “primary index name” to the value of the “secondary index name” and remove the secondary configuration (thus reverting the adapter back to a single-index adapter).

    • Add a migration that cleans up tombstone documents on the index.

    • Review, merge and deploy this change.

  5. Execute a management command to delete the old index. Example:

    ./manage.py prune_elastic_index ElasticBook
    

Elastic Client Adapters

The corehq.apps.es.client module encapsulates the CommCare HQ Elasticsearch client adapters. It implements a high-level Elasticsearch client protocol necessary to accomplish all interactions with the backend Elasticsearch cluster. Client adapters are split into two usage patterns, the “Management Adapter” and “Document Adapters”. Client adapters are instantiated at import time in order to perform index verification when Django starts. Downstream code needing an adapter import and use the adapter instance.

Management Adapter

There is only one management adapter, corehq.apps.es.client.manager. This adapter is used for performing all cluster management tasks such as creating and updating indices and their mappings, changing index settings, changing cluster settings, etc. This functionality is split into a separate class for a few reasons:

  1. The management adapter is responsible for low-level Elastic operations which document adapters should never be performing because the scope of a document adapter does not extend beyond a single index.

  2. Elasticsearch 5+ implements security features which limit the kinds of operations a connection can be used for. The separation in these client adapter classes is designed to fit into that model.

from corehq.apps.es.client import manager

manager.index_create("books")
mapping = {"properties": {
    "author": {"type": "text"},
    "title": {"type": "text"},
    "published": {"type": "date"},
}}
manager.index_put_mapping("books", "book", mapping)
manager.index_refresh("books")
manager.index_delete("books")

Document Adapters

Document adapter classes are defined on a per-index basis and include specific properties and functionality necessary for maintaining a single type of “model” document in a single index. Each index in Elasticsearch needs to have a cooresponding ElasticDocumentAdapter subclass which defines how the Python model is applied to that specific index. At the very least, a document adapter subclass must define the following:

  • A mapping which defines the structure and properties for documents managed by the adapter.

  • A from_python() classmethod which can convert a Python model object into the JSON-serializable format for writing into the adapter’s index.

The combination of (index_name, type) constrains the document adapter to a specific HQ document mapping. Comparing an Elastic cluster to a Postgres database (for the sake of analogy), the Elastic index is analogous to a Postgres schema object (e.g. public), and the _type property is analogous to a Postgres table object. The combination of both index name and _type fully constrains the properties that make up a specific Elastic document.

Document adapters are instantiated once at runtime, via the create_document_adapter() function. The purpose of this function is to act as a shim, returning an ElasticDocumentAdapter instance or an ElasticMultiplexAdapter instance (see Multiplexing Document Adapters below); depending on whether or not a secondary index is defined by the secondary keyword argument.

A simple example of a document model and its corresponding adapter:

class Book:

    def __init__(self, isbn, author, title, published):
        self.isbn = isbn
        self.author = author
        self.title = title
        self.published = published


class ElasticBook(ElasticDocumentAdapter):

    mapping = {"properties": {
        "author": {"type": "text"},
        "title": {"type": "text"},
        "published": {"type": "date"},
    }}

    @classmethod
    def from_python(cls, book):
        source = {
            "author": book.author,
            "title": book.title,
            "published": book.published,
        }
        return book.isbn, source


books_adapter = create_document_adapter(
    ElasticBook,
    index_name="books",
    type_="book",
)

Using this adapter in practice might look as follows:

# index new
new_book = Book(
    "978-1491946008",
    "Luciano Ramalho",
    "Fluent Python: Clear, Concise, and Effective Programming",
    datetime.date(2015, 2, 10),
)
books_adapter.index(new_book)
# fetch existing
classic_book = books_adapter.get("978-0345391803")

Multiplexing Document Adapters

The ElasticMultiplexAdapter is a wrapper around two ElasticDocumentAdapter instances: a primary and a secondary. The multiplexing adapter provides the same public methods as a standard document adapter, but it performs Elasticsearch write operations against both indexes in order to keep them in step with document changes. The multiplexing adapter provides the following functionality:

  • All read operations (exists(), get(), search(), etc) are always performed against the primary adapter only. Read requests are never performed against the secondary adapter.

  • The update() write method always results in two sequential requests against the underlying indexes:

    1. An update request against the primary adapter that simultaneously fetches the full, post-update document body.

    2. An upsert update request against the secondary adapter with the document returned in the primary update response.

  • All other write operations (index(), delete(), bulk(), etc) leverage the Elasticsearch Bulk API to perform the required operations against both indexes simultaneously in as few requests against the backend as possible (a single request in some cases).

    • The index() method always achieves the index into both indexes with a single request.

    • The delete() method attempts to perform the delete against both indexes in a single request, and will only perform a second request in order to index a tombstone on the secondary (if the primary delete succeeded and the secondary delete failed with a 404 status).

    • The bulk() method (the underlying method for all bulk operations) performs actions against both indexes simultaneously by chunking the actions prior to calling elasticsearch.helpers.bulk() (as opposed to relying on that function to perform the chunking). This allows all bulk actions to be applied against both the primary and secondary indexes in parallel, thereby keeping both indexes synchronized throughout the duration of potentially large (multi-request) bulk operations.

Tombstone

The concept of Tombstone in the ES mulitplexer is there to be placeholder for the docs that get deleted on the primary index prior to that document being indexed on the secondary index. It means that whenever an adapter is multiplexed and a document is deleted, then the secondary index will receive a tombstone entry for that document if and only if the primary index delete succeeds and the secondary index delete fails due to a not found condition (404). The python class defined to represent these tombstones is corehq.apps.es.client.Tombstone.

Scenario without tombstones: If a multiplexing adapter deletes a document in the secondary index (which turns out to be a no-op because the document does not exist there yet), and then that same document is copied to the secondary index by the reindexer, then it will exist indefinitely in the secondary even though it has been deleted in the primary.

Put another way:

  • Reindexer: gets batch of objects from primary index to copy to secondary.

  • Multiplexer: deletes a document in that batch (in both primary and secondary indexes).

  • Reindexer: writes deleted (now stale) document into secondary index.

  • Result: secondary index contains a document that has been deleted.

With tombstones: this will not happen because the reindexer uses a “ignore existing documents” copy mode, so it will never overwrite a tombstone with a stale (deleted) document.

Tombstones will only exist in the secondary index and will be deleted as a final step following a successful sync (reindex) operation. Since tombstones can only be created while the primary and secondary indexes are out of sync (secondary index does not yet contain all primary documents), then once the sync is complete, the multiplexer will no longer create new tombstones.

A sample tombstone document would look like

{
  "__is_tombstone__" : True
}

Code Documentation

HQ Elasticsearch client logic (adapters).

class corehq.apps.es.client.BaseAdapter[source]

Base adapter that includes methods common to all adapters.

__init__()[source]
info()[source]

Return the Elasticsearch server info.

ping()[source]

Ping the Elasticsearch service.

class corehq.apps.es.client.BulkActionItem(op_type, doc=None, doc_id=None)[source]

A wrapper for documents to be processed via Elasticsearch’s Bulk API. Collections of these objects can be passed to an ElasticDocumentAdapter’s .bulk() method for processing.

Instances of this class are meant to be acquired via one of the factory methods rather than instantiating directly (via __init__()).

class OpType(value)

An enumeration.

__init__(op_type, doc=None, doc_id=None)[source]
classmethod delete(doc)[source]

Factory method for a document delete action

classmethod delete_id(doc_id)[source]

Factory method for a document delete action providing only the ID

classmethod index(doc)[source]

Factory method for a document index action

property is_delete

True if this is a delete action, otherwise False.

property is_index

True if this is an index action, otherwise False.

class corehq.apps.es.client.ElasticDocumentAdapter(index_name, type_)[source]

Base for subclassing document-specific adapters.

Subclasses must define the following:

  • mapping: attribute (dict)

  • from_python(...): classmethod for converting models into Elastic format

__init__(index_name, type_)[source]

A document adapter for a single index.

Parameters:
  • index_name – the name of the index that this adapter interacts with

  • type – the index _type for the mapping

bulk(actions, refresh=False, raise_errors=True)[source]

Use the Elasticsearch library’s bulk() helper function to process documents en masse.

Equivalent to the legacy ElasticsearchInterface.bulk_ops(...) method.

Parameters:
  • actions – iterable of BulkActionItem instances

  • refreshbool refresh the effected shards to make this operation visible to search

  • raise_errors – whether or not exceptions should be raised if bulk actions fail. The default (True) matches that of the elasticsearch-py library’s bulk() helper function (i.e. raise).

bulk_delete(doc_ids, **bulk_kw)[source]

Convenience method for bulk deleting many documents by ID without the BulkActionItem boilerplate.

Parameters:
  • doc_ids – iterable of document IDs to be deleted

  • bulk_kw – extra parameters passed verbatim to the ElasticDocumentAdapter.bulk() method.

bulk_index(docs, **bulk_kw)[source]

Convenience method for bulk indexing many documents without the BulkActionItem boilerplate.

Parameters:
  • docs – iterable of (Python model) documents to be indexed

  • bulk_kw – extra parameters passed verbatim to the ElasticDocumentAdapter.bulk() method.

count(query)[source]

Return the number of documents matched by the query

Parameters:

querydict query body

Returns:

int

delete(doc_id, refresh=False)[source]

Delete an existing document from Elasticsearch

Equivalent to the legacy ElasticsearchInterface.delete_doc(...) method.

Parameters:
  • doc_idstr ID of the document to delete

  • refreshbool refresh the effected shards to make this operation visible to search

delete_tombstones()[source]

Deletes all tombstones documents present in the index

TODO: This should be replaced by delete_by_query https://www.elastic.co/guide/en/elasticsearch/reference/5.1/docs-delete-by-query.html when on ES version >= 5

exists(doc_id)[source]

Check if a document exists for the provided doc_id

Equivalent to the legacy ElasticsearchInterface.doc_exists(...) method.

Parameters:

doc_idstr ID of the document to be checked

Returns:

bool

export_adapter()[source]

Get an instance of this document adapter configured for “export” queries (i.e. the low-level Elasticsearch client object is configured with longer request timeouts, etc).

from_python(doc)[source]

Transform a Python model object or model dict into the json-serializable (dict) format suitable for indexing in Elasticsearch.

Parameters:

doc – document (instance of a Python model) or a dict representation of that model

Returns:

tuple of (doc_id, source_dict) suitable for being indexed/updated/deleted in Elasticsearch

get(doc_id, source_includes=None)[source]

Return the document for the provided doc_id

Equivalent to the legacy ElasticsearchInterface.get_doc(...) method.

Parameters:
  • doc_idstr ID of the document to be fetched

  • source_includes – a list of fields to extract and return. If None (the default), the entire document is returned.

Returns:

dict

get_docs(doc_ids)[source]

Return multiple docs for the provided doc_ids

Equivalent to the legacy ElasticsearchInterface.get_bulk_docs(...) method.

Parameters:

doc_ids – iterable of document IDs (str’s)

Returns:

dict

index(doc, refresh=False)[source]

Index (send) a new document in (to) Elasticsearch

Equivalent to the legacy ElasticsearchInterface.index_doc(...) method.

Parameters:
  • doc – the (Python model) document to index

  • refreshbool refresh the effected shards to make this operation visible to search

iter_docs(doc_ids, chunk_size=100)[source]

Return a generator which fetches documents in chunks.

Parameters:
  • doc_ids – iterable of document IDs (str s)

  • chunk_sizeint number of documents to fetch per query

Yields:

dict documents

scroll(query, scroll='5m', size=None)[source]

Perfrom a scrolling search, yielding each doc until the entire context is exhausted.

Parameters:
  • querydict raw search query.

  • scrollstr time value specifying how long the Elastic cluster should keep the search context alive.

  • sizeint scroll size (number of documents per “scroll” page) When set to None (the default), the default scroll size is used.

Yields:

dict documents

search(query, **kw)[source]

Perform a query (search) and return the result.

Parameters:
  • querydict search query to execute

  • kw – extra parameters passed directly to the underlying elasticsearch.Elasticsearch.search() method.

Returns:

dict

to_json(doc)[source]

Convenience method that returns the full “from python” document (including the _id key, if present) as it would be returned by an adapter search result.

This method is not used by the adapter itself, and is only present for other code which wishes to work with documents in a couch-like format.

Parameters:

doc – document (instance of a Python model)

update(doc_id, fields, return_doc=False, refresh=False, _upsert=False, retry_on_conflict=None)[source]

Update an existing document in Elasticsearch

Equivalent to the legacy ElasticsearchInterface.update_doc_fields(...) method.

Parameters:
  • doc_idstr ID of the document to update

  • fieldsdict of name/values to update on the existing Elastic doc

  • return_docbool return the full updated doc. When False (the default), None is returned.

  • refreshbool refresh the effected shards to make this operation visible to search

  • _upsertbool. Only needed for multiplexing, use the index() method instead. Create a new document if one doesn’t already exist. When False (the default), performing an update request for a missing document will raise an exception.

  • retry_on_conflictint number of times to retry the update if there is a conflict. Ignored if None (the default). Otherwise, the value it is passed directly to the low-level update() method.

Returns:

dict or None

class corehq.apps.es.client.ElasticManageAdapter[source]
cancel_task(task_id)[source]

Cancells a running task in ES

Parameters:

task_idstr ID of the task

Returns:

dict of task details

Raises:

TaskError or TaskMissing (subclass of TaskError)

cluster_health(index=None)[source]

Return the Elasticsearch cluster health.

cluster_routing(*, enabled)[source]

Enable or disable cluster routing.

Parameters:

enabledbool whether to enable or disable routing

get_aliases()[source]

Return the cluster aliases information.

Returns:

dict with format {<alias>: [<index>, ...], ...}

get_indices()[source]

Return the cluster index information of active indices.

Returns:

dict

get_node_info(node_id, metric)[source]

Return a specific metric from the node info for an Elasticsearch node.

Parameters:
  • node_idstr ID of the node

  • metricstr name of the metric to fetch

Returns:

deserialized JSON (dict, list, str, etc)

get_task(task_id)[source]

Return the details for an active task

Parameters:

task_idstr ID of the task

Returns:

dict of task details

Raises:

TaskError or TaskMissing (subclass of TaskError)

index_close(index)[source]

Close an index.

Parameters:

indexstr index name

index_configure_for_reindex(index)[source]

Update an index with settings optimized for reindexing.

Parameters:

indexstr index for which to change the settings

index_configure_for_standard_ops(index)[source]

Update an index with settings optimized standard HQ performance.

Parameters:

indexstr index for which to change the settings

index_create(index, metadata=None)[source]

Create a new index.

Parameters:
  • indexstr index name

  • metadatadict full index metadata (mappings, settings, etc)

index_delete(index)[source]

Delete an existing index.

Parameters:

indexstr index name

index_exists(index)[source]

Check if index refers to a valid index identifier (index name or alias).

Parameters:

indexstr index name or alias

Returns:

bool

index_flush(index)[source]

Flush an index.

Parameters:

indexstr index name

index_get_mapping(index, type_)[source]

Returns the current mapping for a doc type on an index.

Parameters:
  • indexstr index to fetch the mapping from

  • typestr doc type to fetch the mapping for

Returns:

mapping dict or None if index does not have a mapping

index_get_settings(index, values=None)[source]

Returns the current settings for an index.

Parameters:
  • indexstr index to fetch settings for

  • values – Optional collection of explicit settings to provide in the return value. If None (the default) all settings are returned.

Returns:

dict

Raises:

KeyError (only if invalid values are provided)

index_put_alias(index, name)[source]

Assign an alias to an existing index. This uses the Elasticsearch.update_aliases() method to perform both ‘remove’ and ‘add’ actions simultaneously, which is atomic on the server-side. This ensures that the alias is only assigned to one index at a time, and that (if present) an existing alias does not vanish momentarily.

See: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html

Parameters:
  • indexstr name of the index to be aliased

  • namestr name of the alias to assign to index

index_put_mapping(index, type_, mapping)[source]

Update the mapping for a doc type on an index.

Parameters:
  • indexstr index where the mapping should be updated

  • typestr doc type to update on the index

  • mappingdict mapping for the provided doc type

index_refresh(index)[source]

Convenience method for refreshing a single index.

index_set_replicas(index, replicas)[source]

Set the number of replicas for an index.

Parameters:
  • indexstr index for which to change the replicas

  • replicasint number of replicas

index_validate_query(index, query, params={})[source]

Returns True if passed query is valid else will return false

indices_info()[source]

Retrieve meta information about all the indices in the cluster. This will also return closed indices :returns: dict A dict with index name in keys and index meta information.

indices_refresh(indices)[source]

Refresh a list of indices.

Parameters:

indices – iterable of index names or aliases

reindex(source, dest, wait_for_completion=False, refresh=False, batch_size=1000, requests_per_second=None, copy_doc_ids=True, query=None)[source]

Starts the reindex process in elastic search cluster

Parameters:
  • sourcestr name of the source index

  • deststr name of the destination index

  • wait_for_completionbool would block the request until reindex is complete

  • refreshbool refreshes index

  • requests_per_secondint throttles rate at which reindex issues batches of index operations by padding each batch with a wait time.

  • batch_sizeint The size of the scroll batch used by the reindex process. larger batches may process more quickly but risk errors if the documents are too large. 1000 is the recommended maximum and elasticsearch default, and can be reduced if you encounter scroll timeouts.

  • querydict optional parameter to include a term query to filter which documents are included in the reindex

Returns:

None if wait_for_completion is True else would return task_id of reindex task

class corehq.apps.es.client.ElasticMultiplexAdapter(primary_adapter, secondary_adapter)[source]
__init__(primary_adapter, secondary_adapter)[source]
bulk(actions, refresh=False, raise_errors=True)[source]

Apply bulk actions on the primary and secondary.

Bulk actions are applied against the primary and secondary in chunks of 500 actions at a time (replicates the behavior of the the bulk() helper function). Chunks are applied against the primary and secondary simultaneously by chunking the original actions in blocks of (up to) 250 and performing a single block of (up to) 500 actions against both indexes in parallel. Tombstone documents are indexed on the secondary for any delete actions which succeed on the primary but fail on the secondary.

delete(doc_id, refresh=False)[source]

Delete from both primary and secondary via the bulk() method in order to perform both actions in a single HTTP request (two, if a tombstone is required).

index(doc, refresh=False)[source]

Index into both primary and secondary via the bulk() method in order to perform both actions in a single HTTP request.

update(doc_id, fields, return_doc=False, refresh=False, _upsert=False, **kw)[source]

Update on the primary adapter, fetching the full doc; then upsert the secondary adapter.

class corehq.apps.es.client.Tombstone(doc_id)[source]

Used to create Tombstone documents in the secondary index when the document from primary index is deleted.

This is required to avoid a potential race condition that might ocuur when we run reindex process along with the multiplexer

__init__(doc_id)[source]
corehq.apps.es.client.create_document_adapter(cls, index_name, type_, *, secondary=None)[source]

Creates and returns a document adapter instance for the parameters provided.

One thing to note here is that the behaviour of the function can be altered with django settings.

The function would return multiplexed adapter only if - ES_<app name>_INDEX_MULTIPLEXED is True - Secondary index is provided.

The indexes would be swapped only if - ES_<app_name>_INDEX_SWAPPED is set to True - secondary index is provided

If both ES_<app name>_INDEX_MULTIPLEXED and ES_<app_name>_INDEX_SWAPPED are set to True then primary index will act as secondary index and vice versa.

Parameters:
  • cls – an ElasticDocumentAdapter subclass

  • index_name – the name of the index that the adapter interacts with

  • type – the index _type for the adapter’s mapping.

  • secondary – the name of the secondary index in a multiplexing configuration. If an index name is provided and ES_<app name>_INDEX_MULTIPLEXED is set to True, then returned adapter will be an instance of ElasticMultiplexAdapter. If None (the default), the returned adapter will be an instance of cls. ES_<app name>_INDEX_MULTIPLEXED will be ignored if secondary is None.

Returns:

a document adapter instance.

corehq.apps.es.client.get_client(for_export=False)[source]

Get an elasticsearch client instance.

Parameters:

for_export – (optional bool) specifies whether the returned client should be optimized for slow export queries.

Returns:

elasticsearch.Elasticsearch instance.


Querying Elasticsearch

ESQuery

ESQuery is a library for building elasticsearch queries in a friendly, more readable manner.

Basic usage

There should be a file and subclass of ESQuery for each index we have.

Each method returns a new object, so you can chain calls together like SQLAlchemy. Here’s an example usage:

q = (FormsES()
     .domain(self.domain)
     .xmlns(self.xmlns)
     .submitted(gte=self.datespan.startdate_param,
                lt=self.datespan.enddateparam)
     .source(['xmlns', 'domain', 'app_id'])
     .sort('received_on', desc=False)
     .size(self.pagination.count)
     .start(self.pagination.start)
     .terms_aggregation('babies.count', 'babies_saved'))
result = q.run()
total_docs = result.total
hits = result.hits

Generally useful filters and queries should be abstracted away for re-use, but you can always add your own like so:

q.filter({"some_arbitrary_filter": {...}})
q.set_query({"fancy_query": {...}})

For debugging or more helpful error messages, you can use query.dumps() and query.pprint(), both of which use json.dumps() and are suitable for pasting in to ES Head or Marvel or whatever

Filtering

Filters are implemented as standalone functions, so they can be composed and nested q.OR(web_users(), mobile_users()). Filters can be passed to the query.filter method: q.filter(web_users())

There is some syntactic sugar that lets you skip this boilerplate and just call the filter as if it were a method on the query class: q.web_users() In order to be available for this shorthand, filters are added to the builtin_filters property of the main query class. I know that’s a bit confusing, but it seemed like the best way to make filters available in both contexts.

Generic filters applicable to all indices are available in corehq.apps.es.filters. (But most/all can also be accessed as a query method, if appropriate)

Filtering Specific Indices

There is a file for each elasticsearch index (if not, feel free to add one). This file provides filters specific to that index, as well as an appropriately-directed ESQuery subclass with references to these filters.

These index-specific query classes also have default filters to exclude things like inactive users or deleted docs. These things should nearly always be excluded, but if necessary, you can remove these with remove_default_filters.

Language

  • es_query - the entire query, filters, query, pagination

  • filters - a list of the individual filters

  • query - the query, used for searching, not filtering

  • field - a field on the document. User docs have a ‘domain’ field.

  • lt/gt - less/greater than

  • lte/gte - less/greater than or equal to

class corehq.apps.es.es_query.ESQuery(index=None, for_export=False)[source]

This query builder only outputs the following query structure:

{
  "query": {
    "bool": {
      "filter": {
        "and": [
          <filters>
        ]
      },
      "query": <query>
    }
  },
  <size, sort, other params>
}
__init__(index=None, for_export=False)[source]
add_query(new_query, clause)[source]

Add a query to the current list of queries

aggregation(aggregation)[source]

Add the passed-in aggregation to the query

property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

dumps(pretty=False)[source]

Returns the JSON query that will be sent to elasticsearch.

exclude_source()[source]

Turn off _source retrieval. Mostly useful if you just want the doc_ids

fields(fields)[source]

Restrict the fields returned from elasticsearch

Deprecated. Use source instead.

filter(filter)[source]

Add the passed-in filter to the query. All filtering goes through this class.

property filters

Return a list of the filters used in this query, suitable if you want to reproduce a query with additional filtering.

get_ids()[source]

Performs a minimal query to get the ids of the matching documents

For very large sets of IDs, use scroll_ids instead

nested_sort(path, field_name, nested_filter, desc=False, reset_sort=True, sort_missing=None)[source]

Order results by the value of a nested field

pprint()[source]

pretty prints the JSON query that will be sent to elasticsearch.

remove_default_filter(default)[source]

Remove a specific default filter by passing in its name.

remove_default_filters()[source]

Sensible defaults are provided. Use this if you don’t want ‘em

run()[source]

Actually run the query. Returns an ESQuerySet object.

scroll()[source]

Run the query against the scroll api. Returns an iterator yielding each document that matches the query.

scroll_ids()[source]

Returns a generator of all matching ids

scroll_ids_to_disk_and_iter_docs()[source]

Returns a ScanResult for all matched documents.

Used for iterating docs for a very large query where consuming the docs via self.scroll() may exceed the amount of time that the scroll context can remain open. This is achieved by:

  1. Fetching the IDs for all matched documents (via scroll_ids()) and caching them in a temporary file on disk, then

  2. fetching the documents by (chunked blocks of) IDs streamed from the temporary file.

Original design PR: https://github.com/dimagi/commcare-hq/pull/20282

Caveats: - There is no guarantee that the returned ScanResult’s count property will match the number of yielded docs. - Documents that are present when scroll_ids() is called, but are deleted prior to being fetched in full will be missing from the results, and this scenario will not raise an exception. - If Elastic document ID values are ever reused (i.e. new documents are created with the same ID of a previously-deleted document) then this method would become unsafe because it could yield documents that were not matched by the query.

search_string_query(search_string, default_fields)[source]

Accepts a user-defined search string

set_query(query)[source]

Set the query. Most stuff we want is better done with filters, but if you actually want Levenshtein distance or prefix querying…

set_sorting_block(sorting_block)[source]

To be used with get_sorting_block, which interprets datatables sorting

size(size)[source]

Restrict number of results returned. Analagous to SQL limit, except when performing a scroll, in which case this value becomes the number of results to fetch per scroll request.

sort(field, desc=False, reset_sort=True)[source]

Order the results by field.

source(include, exclude=None)[source]

Restrict the output of _source in the queryset. This can be used to return an object in a queryset

start(start)[source]

Pagination. Analagous to SQL offset.

values(*fields)[source]

modeled after django’s QuerySet.values

class corehq.apps.es.es_query.ESQuerySet(raw, query)[source]
The object returned from ESQuery.run
  • ESQuerySet.raw is the raw response from elasticsearch

  • ESQuerySet.query is the ESQuery object

__init__(raw, query)[source]
property doc_ids

Return just the docs ids from the response.

property hits

Return the docs from the response.

static normalize_result(query, result)[source]

Return the doc from an item in the query response.

property total

Return the total number of docs matching the query.

class corehq.apps.es.es_query.HQESQuery(index=None, for_export=False)[source]

Query logic specific to CommCare HQ

property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

exception corehq.apps.es.es_query.InvalidQueryError[source]

Query parameters cannot be assembled into a valid search.

Available Filters

The following filters are available on any ESQuery instance - you can chain any of these on your query.

Note also that the term filter accepts either a list or a single element. Simple filters which match against a field are based on this filter, so those will also accept lists. That means you can do form_query.xmlns(XMLNS1) or form_query.xmlns([XMLNS1, XMLNS2, ...]).

Contributing: Additions to this file should be added to the builtin_filters method on either ESQuery or HQESQuery, as appropriate (is it an HQ thing?).

corehq.apps.es.filters.AND(*filters)[source]

Filter docs to match all of the filters passed in

corehq.apps.es.filters.NOT(filter_)[source]

Exclude docs matching the filter passed in

corehq.apps.es.filters.OR(*filters)[source]

Filter docs to match any of the filters passed in

corehq.apps.es.filters.date_range(field, gt=None, gte=None, lt=None, lte=None)[source]

Range filter that accepts date and datetime objects as arguments

corehq.apps.es.filters.doc_id(doc_id)[source]

Filter by doc_id. Also accepts a list of doc ids

corehq.apps.es.filters.doc_type(doc_type)[source]

Filter by doc_type. Also accepts a list

corehq.apps.es.filters.domain(domain_name)[source]

Filter by domain.

corehq.apps.es.filters.empty(field)[source]

Only return docs with a missing or null value for field

corehq.apps.es.filters.exists(field)[source]

Only return docs which have a value for field

corehq.apps.es.filters.geo_bounding_box(field, top_left, bottom_right)[source]

Only return geopoints stored in field that are located within the bounding box defined by top_left and bottom_right.

top_left and bottom_right accept a range of data types and formats.

More info: Geo Bounding Box Query

corehq.apps.es.filters.geo_grid(field, geohash)[source]

Filters cases by the geohash grid cell in which they are located.

corehq.apps.es.filters.geo_polygon(field, points)[source]

Filters geo_point values in field that fall within the polygon described by the list of points.

More info: Geo Polygon Query

Parameters:
  • field – A field with Elasticsearch data type geo_point.

  • points – A list of points that describe a polygon. Elasticsearch supports a range of formats for list items.

Returns:

A filter dict.

corehq.apps.es.filters.geo_shape(field, shape, relation='intersects')[source]

Filters cases by case properties indexed using the geo_point type.

More info: The Geoshape query reference

Parameters:
  • field – The field where geopoints are stored

  • shape – A shape definition given in GeoJSON geometry format. More info: The GeoJSON specification (RFC 7946)

  • relation – The relation between the shape and the case property values.

Returns:

A filter definition

corehq.apps.es.filters.missing(field)[source]

Only return docs missing a value for field

corehq.apps.es.filters.nested(path, filter_)[source]

Query nested documents which normally can’t be queried directly

corehq.apps.es.filters.non_null(field)[source]

Only return docs with a real, non-null value for field

corehq.apps.es.filters.range_filter(field, gt=None, gte=None, lt=None, lte=None)[source]

Filter field by a range. Pass in some sensible combination of gt (greater than), gte (greater than or equal to), lt, and lte.

corehq.apps.es.filters.term(field, value)[source]

Filter docs by a field ‘value’ can be a singleton or a list.

Available Queries

Queries are used for actual searching - things like relevancy scores, Levenstein distance, and partial matches.

View the elasticsearch documentation to see what other options are available, and put ‘em here if you end up using any of ‘em.

corehq.apps.es.queries.filtered(query, filter_)[source]

Filtered query for performing both filtering and querying at once

corehq.apps.es.queries.geo_distance(field, geopoint, **kwargs)[source]

Filters cases to those within a certain distance of the provided geopoint

eg: geo_distance(‘gps_location’, GeoPoint(-33.1, 151.8), kilometers=100)

corehq.apps.es.queries.match_all()[source]

No-op query used because a default must be specified

corehq.apps.es.queries.nested(path, query, *args, **kwargs)[source]

Creates a nested query for use with nested documents

Keyword arguments such as score_mode and others can be added.

corehq.apps.es.queries.nested_filter(path, filter_, *args, **kwargs)[source]

Creates a nested query for use with nested documents

Keyword arguments such as score_mode and others can be added.

corehq.apps.es.queries.search_string_query(search_string, default_fields)[source]

All input defaults to doing an infix search for each term. (This may later change to some kind of fuzzy matching).

This is also available via the main ESQuery class.

Aggregate Queries

Aggregations are a replacement for Facets

Here is an example used to calculate how many new pregnancy cases each user has opened in a certain date range.

res = (CaseES()
       .domain(self.domain)
       .case_type('pregnancy')
       .date_range('opened_on', gte=startdate, lte=enddate))
       .aggregation(TermsAggregation('by_user', 'opened_by')
       .size(0)

buckets = res.aggregations.by_user.buckets
buckets.user1.doc_count

There’s a bit of magic happening here - you can access the raw json data from this aggregation via res.aggregation('by_user') if you’d prefer to skip it.

The res object has a aggregations property, which returns a namedtuple pointing to the wrapped aggregation results. The name provided at instantiation is used here (by_user in this example).

The wrapped aggregation_result object has a result property containing the aggregation data, as well as utilties for parsing that data into something more useful. For example, the TermsAggregation result also has a counts_by_bucket method that returns a {bucket: count} dictionary, which is normally what you want.

As of this writing, there’s not much else developed, but it’s pretty easy to add support for other aggregation types and more results processing

class corehq.apps.es.aggregations.AggregationRange(start=None, end=None, key=None)[source]

Note that a range includes the “start” value and excludes the “end” value. i.e. start <= X < end

Parameters:
  • start – range start

  • end – range end

  • key – optional key name for the range

class corehq.apps.es.aggregations.AggregationTerm(name, field)
field

Alias for field number 1

name

Alias for field number 0

class corehq.apps.es.aggregations.AvgAggregation(name, field)[source]
class corehq.apps.es.aggregations.CardinalityAggregation(name, field)[source]
class corehq.apps.es.aggregations.DateHistogram(name, datefield, interval, timezone=None)[source]

Aggregate by date range. This can answer questions like “how many forms were created each day?”.

Parameters:
  • name – what do you want to call this aggregation

  • datefield – the document’s date field to look at

  • interval – the date interval to use - from DateHistogram.Interval

  • timezone – do bucketing using this time zone instead of UTC

__init__(name, datefield, interval, timezone=None)[source]
class corehq.apps.es.aggregations.ExtendedStatsAggregation(name, field, script=None)[source]

Extended stats aggregation that computes an extended stats aggregation by field

class corehq.apps.es.aggregations.FilterAggregation(name, filter)[source]

Bucket aggregation that creates a single bucket for the specified filter

Parameters:
  • name – aggregation name

  • filter – filter body

__init__(name, filter)[source]
class corehq.apps.es.aggregations.FiltersAggregation(name, filters=None)[source]

Bucket aggregation that creates a bucket for each filter specified using the filter name.

Parameters:

name – aggregation name

__init__(name, filters=None)[source]
add_filter(name, filter)[source]
Parameters:
  • name – filter name

  • filter – filter body

class corehq.apps.es.aggregations.GeoBoundsAggregation(name, field)[source]

A metric aggregation that computes the bounding box containing all geo_point values for a field.

More info: Geo Bounds Aggregation

__init__(name, field)[source]
class corehq.apps.es.aggregations.GeohashGridAggregation(name, field, precision)[source]

A multi-bucket aggregation that groups geo_point and geo_shape values into buckets that represent a grid.

More info: Geohash grid aggregation

__init__(name, field, precision)[source]

Initialize a GeohashGridAggregation

Parameters:
  • name – The name of this aggregation

  • field – The case property that stores a geopoint

  • precision – A value between 1 and 12

High precision geohashes have a long string length and represent cells that cover only a small area (similar to long-format ZIP codes like “02139-4075”).

Low precision geohashes have a short string length and represent cells that each cover a large area (similar to short-format ZIP codes like “02139”).

class corehq.apps.es.aggregations.MaxAggregation(name, field)[source]
class corehq.apps.es.aggregations.MinAggregation(name, field)[source]

Bucket aggregation that returns the minumum value of a field

Parameters:
  • name – aggregation name

  • field – name of the field to min

class corehq.apps.es.aggregations.MissingAggregation(name, field)[source]

A field data based single bucket aggregation, that creates a bucket of all documents in the current document set context that are missing a field value (effectively, missing a field or having the configured NULL value set).

Parameters:
  • name – aggregation name

  • field – name of the field to bucket on

__init__(name, field)[source]
class corehq.apps.es.aggregations.NestedAggregation(name, path)[source]

A special single bucket aggregation that enables aggregating nested documents.

Parameters:

path – Path to nested document

__init__(name, path)[source]
class corehq.apps.es.aggregations.NestedTermAggregationsHelper(base_query, terms)[source]

Helper to run nested term-based queries (equivalent to SQL group-by clauses). This is not at all related to the ES ‘nested aggregation’. The final aggregation is a count of documents.

Example usage:

# counting all forms submitted in a domain grouped by app id and user id

NestedTermAggregationsHelper(
    base_query=FormES().domain(domain_name),
    terms=[
        AggregationTerm('app_id', 'app_id'),
        AggregationTerm('user_id', 'form.meta.userID'),
    ]
).get_data()

This works by bucketing docs first by one terms aggregation, then within that bucket, bucketing further by the next term, and so on. This is then flattened out to appear like a group-by-multiple.

__init__(base_query, terms)[source]
class corehq.apps.es.aggregations.RangeAggregation(name, field, ranges=None, keyed=True)[source]

Bucket aggregation that creates one bucket for each range :param name: the aggregation name :param field: the field to perform the range aggregations on :param ranges: list of AggregationRange objects :param keyed: set to True to have the results returned by key instead of as a list (see RangeResult.normalized_buckets)

__init__(name, field, ranges=None, keyed=True)[source]
class corehq.apps.es.aggregations.StatsAggregation(name, field, script=None)[source]

Stats aggregation that computes a stats aggregation by field

Parameters:
  • name – aggregation name

  • field – name of the field to collect stats on

  • script – an optional field to allow you to script the computed field

__init__(name, field, script=None)[source]
class corehq.apps.es.aggregations.SumAggregation(name, field)[source]

Bucket aggregation that sums a field

Parameters:
  • name – aggregation name

  • field – name of the field to sum

__init__(name, field)[source]
class corehq.apps.es.aggregations.TermsAggregation(name, field, size=None, missing=None)[source]

Bucket aggregation that aggregates by field

Parameters:
  • name – aggregation name

  • field – name of the field to bucket on

  • size

  • missing – define how documents that are missing a value should be treated. By default, they will be ignored. If a value is supplied here it will be used where the value is missing.

__init__(name, field, size=None, missing=None)[source]
class corehq.apps.es.aggregations.TopHitsAggregation(name, field=None, is_ascending=True, size=1, include=None)[source]

A top_hits metric aggregator keeps track of the most relevant document being aggregated This aggregator is intended to be used as a sub aggregator, so that the top matching documents can be aggregated per bucket.

Parameters:
  • name – Aggregation name

  • field – This is the field to sort the top hits by. If None, defaults to sorting by score.

  • is_ascending – Whether to sort the hits in ascending or descending order.

  • size – The number of hits to include. Defaults to 1.

  • include – An array of fields to include in the hit. Defaults to returning the whole document.

__init__(name, field=None, is_ascending=True, size=1, include=None)[source]
class corehq.apps.es.aggregations.ValueCountAggregation(name, field)[source]

AppES

class corehq.apps.es.apps.AppES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

index = 'apps'
class corehq.apps.es.apps.ElasticApp(index_name, type_)[source]
canonical_name = 'apps'
property mapping
property model_cls
settings_key = 'hqapps'
corehq.apps.es.apps.app_id(app_id)[source]
corehq.apps.es.apps.build_comment(comment)[source]
corehq.apps.es.apps.cloudcare_enabled(cloudcare_enabled)[source]
corehq.apps.es.apps.created_from_template(from_template=True)[source]
corehq.apps.es.apps.is_build(build=True)[source]
corehq.apps.es.apps.is_released(released=True)[source]
corehq.apps.es.apps.uses_case_sharing(case_sharing=True)[source]
corehq.apps.es.apps.version(version)[source]

UserES

Here’s an example adapted from the case list report - it gets a list of the ids of all unknown users, web users, and demo users on a domain.

from corehq.apps.es import users as user_es

user_filters = [
    user_es.unknown_users(),
    user_es.web_users(),
    user_es.demo_users(),
]

query = (user_es.UserES()
         .domain(self.domain)
         .OR(*user_filters)
         .show_inactive())

owner_ids = query.get_ids()
class corehq.apps.es.users.ElasticUser(index_name, type_)[source]
canonical_name = 'users'
property mapping
property model_cls
settings_key = 'hqusers'
class corehq.apps.es.users.UserES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

default_filters = {'active': {'term': {'is_active': True}}, 'not_deleted': {'term': {'base_doc': 'couchuser'}}}
index = 'users'
show_inactive()[source]

Include inactive users, which would normally be filtered out.

show_only_inactive()[source]
corehq.apps.es.users.admin_users()[source]

Return only AdminUsers. Admin users are mock users created from xform submissions with unknown user ids whose username is “admin”.

corehq.apps.es.users.analytics_enabled(enabled=True)[source]
corehq.apps.es.users.created(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.users.demo_users()[source]

Matches users whose username is demo_user

corehq.apps.es.users.domain(domain, allow_enterprise=False)[source]
corehq.apps.es.users.domains(domains)[source]
corehq.apps.es.users.is_active(active=True)[source]
corehq.apps.es.users.is_practice_user(practice_mode=True)[source]
corehq.apps.es.users.last_logged_in(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.users.location(location_id)[source]
corehq.apps.es.users.login_as_user(value)[source]
corehq.apps.es.users.missing_or_empty_user_data_property(property_name)[source]

A user_data property doesn’t exist, or does exist but has an empty string value.

corehq.apps.es.users.mobile_users()[source]
corehq.apps.es.users.query_user_data(key, value)[source]
corehq.apps.es.users.role_id(role_id)[source]
corehq.apps.es.users.unknown_users()[source]

Return only UnknownUsers. Unknown users are mock users created from xform submissions with unknown user ids.

corehq.apps.es.users.user_ids(user_ids)[source]
corehq.apps.es.users.username(username)[source]
corehq.apps.es.users.web_users()[source]

CaseES

Here’s an example getting pregnancy cases that are either still open or were closed after May 1st.

from corehq.apps.es import cases as case_es

q = (case_es.CaseES()
     .domain('testproject')
     .case_type('pregnancy')
     .OR(case_es.is_closed(False),
         case_es.closed_range(gte=datetime.date(2015, 05, 01))))
class corehq.apps.es.cases.CaseES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

index = 'cases'
class corehq.apps.es.cases.ElasticCase(index_name, type_)[source]
canonical_name = 'cases'
property mapping
property model_cls
settings_key = 'hqcases'
corehq.apps.es.cases.active_in_range(gt=None, gte=None, lt=None, lte=None)[source]

Restricts cases returned to those with actions during the range

corehq.apps.es.cases.case_ids(case_ids)[source]
corehq.apps.es.cases.case_name(name)[source]
corehq.apps.es.cases.case_type(type_)[source]
corehq.apps.es.cases.closed_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.is_closed(closed=True)[source]
corehq.apps.es.cases.modified_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.open_case_aggregation(name='open_case', gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.opened_by(user_id)[source]
corehq.apps.es.cases.opened_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.owner(owner_id)[source]
corehq.apps.es.cases.owner_type(owner_type)[source]
corehq.apps.es.cases.server_modified_range(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.touched_total_aggregation(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.cases.user(user_id)[source]
corehq.apps.es.cases.user_ids_handle_unknown(user_ids)[source]

FormES

class corehq.apps.es.forms.ElasticForm(index_name, type_)[source]
canonical_name = 'forms'
property mapping
property model_cls
settings_key = 'xforms'
class corehq.apps.es.forms.FormES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

default_filters = {'has_domain': {'exists': {'field': 'domain'}}, 'has_user': {'exists': {'field': 'form.meta.userID'}}, 'has_xmlns': {'exists': {'field': 'xmlns'}}, 'is_xform_instance': {'term': {'doc_type': 'xforminstance'}}}
domain_aggregation()[source]
index = 'forms'
only_archived()[source]

Include only archived forms, which are normally excluded

user_aggregation()[source]
corehq.apps.es.forms.app(app_ids)[source]
corehq.apps.es.forms.completed(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.forms.form_ids(form_ids)[source]
corehq.apps.es.forms.submitted(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.forms.updating_cases(case_ids)[source]

return only those forms that have case blocks that touch the cases listed in case_ids

corehq.apps.es.forms.user_id(user_ids)[source]
corehq.apps.es.forms.user_ids_handle_unknown(user_ids)[source]
corehq.apps.es.forms.user_type(user_types)[source]
corehq.apps.es.forms.xmlns(xmlnss)[source]

DomainES

from corehq.apps.es import DomainES

query = (DomainES()
         .in_domains(domains)
         .created(gte=datespan.startdate, lte=datespan.enddate)
         .size(0))
class corehq.apps.es.domains.DomainES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

default_filters = {'not_snapshot': {'bool': {'must_not': {'term': {'is_snapshot': True}}}}}
index = 'domains'
only_snapshots()[source]

Normally snapshots are excluded, instead, return only snapshots

class corehq.apps.es.domains.ElasticDomain(index_name, type_)[source]
analysis = {'analyzer': {'comma': {'pattern': '\\s*,\\s*', 'type': 'pattern'}, 'default': {'filter': ['lowercase'], 'tokenizer': 'whitespace', 'type': 'custom'}}}
canonical_name = 'domains'
property mapping
property model_cls
settings_key = 'hqdomains'
corehq.apps.es.domains.created(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.domains.created_by_user(creating_user)[source]
corehq.apps.es.domains.in_domains(domains)[source]
corehq.apps.es.domains.incomplete_domains()[source]
corehq.apps.es.domains.is_active(is_active=True)[source]
corehq.apps.es.domains.is_active_project(is_active=True)[source]
corehq.apps.es.domains.last_modified(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.domains.non_test_domains()[source]
corehq.apps.es.domains.real_domains()[source]
corehq.apps.es.domains.self_started()[source]

SMSES

class corehq.apps.es.sms.ElasticSMS(index_name, type_)[source]
canonical_name = 'sms'
property mapping
property model_cls
settings_key = 'smslogs'
class corehq.apps.es.sms.SMSES(index=None, for_export=False)[source]
property builtin_filters

A list of callables that return filters. These will all be available as instance methods, so you can do self.term(field, value) instead of self.filter(filters.term(field, value))

index = 'sms'
user_aggregation()[source]
corehq.apps.es.sms.direction(direction_)[source]
corehq.apps.es.sms.incoming_messages()[source]
corehq.apps.es.sms.outgoing_messages()[source]
corehq.apps.es.sms.processed(processed=True)[source]
corehq.apps.es.sms.processed_or_incoming_messages()[source]
corehq.apps.es.sms.received(gt=None, gte=None, lt=None, lte=None)[source]
corehq.apps.es.sms.to_commcare_case()[source]
corehq.apps.es.sms.to_commcare_user()[source]
corehq.apps.es.sms.to_commcare_user_or_case()[source]
corehq.apps.es.sms.to_couch_user()[source]
corehq.apps.es.sms.to_web_user()[source]