Elasticsearch
Overview
Indexes
- We have indexes for each of the following doc types:
Applications -
hqapps
Cases -
hqcases
Domains -
hqdomains
Forms -
xforms
Groups -
hqgroups
Users -
hqusers
SMS logs -
smslogs
Case Search -
case_search
Each index has a corresponding mapping file in corehq/pillows/mappings/
.
Each mapping has a hash that reflects the current state of the mapping. This
can just be a random alphanumeric string.
The hash is appended to the index name so the index is called something like
xforms_1cce1f049a1b4d864c9c25dc42648a45
. Each type of index has an alias
with the short name, so you should normally be querying just xforms
, not
the fully specified index+hash. All of HQ code except the index maintenance
code uses aliases to read and write data to indices.
Whenever the mapping is changed, this hash should be updated. That will
trigger the creation of a new index on deploy (by the $ ./manage.py
ptop_preindex
command). Once the new index is finished, the alias is
flipped ($ ./manage.py ptop_es_manage --flip_all_aliases
) to point
to the new index, allowing for a relatively seamless transition.
Keeping indexes up-to-date
Pillowtop looks at the changes feed from couch and listens for any relevant new/changed docs. In order to have your changes appear in elasticsearch, pillowtop must be running:
$ ./manage.py run_ptop --all
You can also run a once-off reindex for a specific index:
$ ./manage.py ptop_reindexer_v2 user
Changing a mapping or adding data
If you’re adding additional data to elasticsearch, you’ll need modify that index’s mapping file in order to be able to query on that new data.
Adding data to an index
Each pillow has a function or class that takes in the raw document dictionary
and transforms it into the document that get’s sent to ES. If for example,
you wanted to store username in addition to user_id on cases in elastic,
you’d add username
to corehq.pillows.mappings.case_mapping
, then
modify transform_case_for_elasticsearch
function to do the
appropriate lookup. It accepts a doc_dict
for the case doc and is
expected to return a doc_dict
, so just add the username
to that.
Building the new index
Once you’ve made the change, you’ll need to build a new index which uses
that new mapping. Updating index name in the mapping file triggers HQ to
create the new index with new mapping and reindex all data, so you’ll
have to update the index hash and alias at the top of the mapping file.
The hash suffix to the index can just be a random alphanumeric string and
is usually the date of the edit by convention. The alias should also be updated
to a new one of format xforms_<date-modified>
(the date is just by convention), so that
production operations continue to use the old alias pointing to existing index.
This will trigger a preindex as outlined in the Indexes section. In subsequent commits
alias can be flipped back to what it was, for example xforms
. Changing the alias
name doesn’t trigger a reindex.
Updating indexes in a production environment
Updates in a production environment should be done in two steps, so to not show incomplete data.
Setup a release of your branch using cchq <env> setup_limited_release:keep_days=n_days
In your release directory, kick off a index using
./mange.py ptop_preindex
Verify that the reindex has completed successfully - This is a weak point in our current migration process - This can be done by using ES head or the ES APIs to compare document counts to the previous index. - You should also actively look for errors in the ptop_preindex command that was ran
Merge your PR and deploy your latest master branch.
How to un-bork your broken indexes
Sometimes things get in a weird state and (locally!) it’s easiest to just blow away the index and start over.
Delete the affected index. The easiest way to do this is with elasticsearch-head. You can delete multiple affected indices with
curl -X DELETE http://localhost:9200/*
.*
can be replaced with any regex to delete matched indices, similar to bash regex.Run
$ ./manage.py ptop_preindex && ./manage.py ptop_es_manage --flip_all_aliases
.Try again
Querying Elasticsearch - Best Practices
Here are the most basic things to know if you want to write readable and reasonably performant code for accessing Elasticsearch.
Use ESQuery when possible
Check out Querying Elasticsearch
Prefer the cleaner
.count()
,.values()
,.values_list()
, etc. execution methods to the more low level.run().hits
,.run().total
, etc. With the latter easier to make mistakes and fall into anti-patterns and it’s harder to read.Prefer adding filter methods to using
set_query()
unless you really know what you’re doing and are willing to make your code more error prone and difficult to read.
Prefer “get” to “search”
Don’t use search to fetch a doc or doc fields by doc id; use “get” instead. Searching by id can be easily an order of magnitude (10x) slower. If done in a loop, this can effectively grind the ES cluster to a halt.
Bad::
POST /hqcases_2016-03-04/case/_search
{
"query": {
"filtered": {
"filter": {
"and": [{"terms": {"_id": [case_id]}}, {"match_all": {}}]
},
"query": {"match_all":{}}
}
},
"_source": ["name"],
"size":1000000
}
Good::
GET /hqcases_2016-03-04/case/<case_id>?_source_include=name
Prefer scroll queries
Use a scroll query when fetching lots of records.
Prefer filter to query
Don’t use query
when you could use filter
if you don’t need rank.
Use size(0) with aggregations
Use size(0)
when you’re only doing aggregations thing—otherwise you’ll
get back doc bodies as well! Sometimes that’s just abstractly wasteful, but often
it can be a serious performance hit for the operation as well as the cluster.
The best way to do this is by using helpers like ESQuery’s .count()
that know to do this for you—your code will look better and you won’t have to remember
to check for that every time. (If you ever find helpers not doing this correctly,
then it’s definitely worth fixing.)
Elasticsearch App
Elasticsearch Index Management
CommCare HQ data in Elasticsearch is integral to core application functionality.
The level that the application relies on Elasticsearch data varies from index to
index. Currently, Elasticsearch contains both authoritative data (for example
@indexed_on
case property and UnknownUser
user records) and data used
for real-time application logic (the users
index, for example).
In order to guarantee stability (or “manageability”, if you will) of this core data, it is important that Elasticsearch indexes are maintained in a consistent state across all environments as a concrete design feature of CommCare HQ. This design constraint is accomplished by managing Elasticsearch index modifications (for example: creating indexes, updating index mappings, etc) exclusively through Django’s migration framework. This ensures that all Elasticsearch index modifications will be part of standard CommCare HQ code deployment procedures, thereby preventing Elasticsearch index state drift between maintained CommCare HQ deployments.
One or more migrations are required any time the following Elasticsearch state configurations are changed in code:
index names
index aliases
analyzers
mappings
tuning parameters
Elasticsearch allows changing an index’s number_of_replicas
tuning parameter
on a live index. In the future, the configuration settings (i.e. “live state”)
of that value should be removed from the CommCare HQ codebase entirely in order
to decouple it from application logic.
Creating Elasticsearch Index Migrations
Like Django Model migrations, Elasticsearch index migrations can be quite
verbose. To aid in creating these migrations, there is a Django manage command
that can generate migration files for Elasticsearch index operations. Since
the Elasticsearch index state is not a Django model, Django’s model migration
framework cannot automatically determine what operations need to be included in
a migration, or even when a new migration is required. This is why creating
these migrations is a separate command and not integrated into the default
makemigrations
command.
To create a new Elasticsearch index migration, use the
make_elastic_migration
management command and provide details for the
required migration operations via any combination of the -c/--create
,
-u/--update
and/or -d/--delete
command line options.
Similar to Django model migrations, this management command uses the index metadata (mappings, analysis, etc) from the existing Elasticsearch code, so it is important that this command is executed after making changes to index metadata. To provide an example, consider a hypothetical scenario where the following index changes are needed:
create a new
users
indexupdate the mapping on the existing
groups
index to add a new property namedpending_users
delete the existing index named
groups-sandbox
After the new property has been added to the groups
index mapping in code,
the following management command would create a migration file (e.g.
corehq/apps/es/migrations/0003_groups_pending_users.py
) for the necessary
operations:
./manage.py make_elastic_migration --name groups_pending_users -c users -u groups:pending_users -d groups-sandbox
Updating Elastic Index Mappings
Prior to the UpdateIndexMapping
migration operation implementation, Elastic
mappings were always applied “in full” any time a mapping change was needed.
That is: the entire mapping (from code) was applied to the existing index via
the Put Mapping API. This technique had some pros and cons:
Pro: the mapping update logic in code was simple because it did not have to worry about which existing mapping properties are persistent (persist on the index even if omitted in a PUT request payload) and which ones are volatile (effectively “unset” if omitted in a PUT request payload).
Con: it requires that all mapping properties are explicitly set on every mapping update, making mapping updates impossible if the existing index mapping in Elasticsearch has diverged from the mapping in code.
Because CommCare HQ Elastic mappings have been able to drift between environments, it is no longer possible to update some index mappings using the historical technique. On some indexes, the live index mappings have sufficiently diverged that there is no common, “full mapping definition” that can be applied on all environments. This means that in order to push mapping changes to all environments, new mapping update logic is needed which is capable of updating individual properties on an Elastic index mapping while leaving other (existing) properties unchanged.
The UpdateIndexMapping
migration operation adds this capability. Due to the
complex behavior of the Elasticsearch “Put Mapping” API, this implementation is
limited to only support changing the mapping _meta
and properties
items.
Changing other mapping properties (e.g. date_detection
, dynamic
, etc) is
not yet implemented. However, the current implementation does ensure that the
existing values are retained (unchanged). Historically, these values are rarely
changed, so this limitation does not hinder any kind of routine maintenance
operations. Implementing the ability to change the other properties will be a
simple task when there is a clear definition of how that functionality needs to
work, for example: when a future feature/change requires changing these
properties for a specific reason.
Comparing Mappings In Code Against Live Indexes
When modifying mappings for an existing index, it can be useful to compare the
new mapping (as defined in code) to the live index mappings in Elasticsearch on
a CommCare HQ deployment. This is possible by dumping the mappings of interest
into local files and comparing them with a diff utility. The
print_elastic_mappings
Django manage command makes this process relatively
easy. Minimally, this can be accomplished in as few as three steps:
Export the local code mapping into a new file.
Export the mappings from a deployed environment into a local file.
Compare the two files.
In practice, this might look like the following example:
./manage.py print_elastic_mappings sms --no-names > ./sms-in-code.py
cchq <env> django-manage print_elastic_mappings smslogs_2020-01-28:sms --no-names > ./sms-live.py
diff -u ./sms-live.py ./sms-in-code.py
Elastic Index Tuning Configurations
CommCare HQ provides a mechanism for individual deployments (environments) to
tune the performance characteristics of their Elasticsearch indexes via Django
settings. This mechanism can be used by defining an ES_SETTINGS
dictionary
in localsettings.py
(or by configuring the requisite Elasticsearch
parameters in a CommCare Cloud environment). Tuning parameters can be
specified in one of two ways:
“default”: configures the tuning settings for all indexes in the environment.
index identifier: configures the tuning settings for a specific index in the environment – these settings take precedence over “default” settings.
For example, if an environment wishes to explicitly configure the “case_search”
index with six shards, and all others with only three, the configuration could
be specified in localsettings.py
as:
ES_SETTINGS = {
"default": {"number_of_shards": 3},
"case_search": {"number_of_shards": 6},
}
Configuring a tuning setting with the special value None
will result in that
configuration item being reset to the Elasticsearch cluster default (unless
superseded by another setting with higher precedence). Refer to
corehq/app/es/index/settings.py file for the full details regarding what
items (index and tunning settings values) are configurable, as well as what
default tuning settings will be used when not customized by the environment.
Important note: These Elasticsearch index tuning settings are not “live”. That is: changing their values on a deployed environment will not have any immediate affect on live indexes in Elasticsearch. Instead, these values are only ever used when an index is created (for example, during a fresh CommCare HQ installation or when an existing index is reindexed into a new one). This means that making new values become “live” involves an index migration and reindex, which requires changes in the CommCare HQ codebase.
Adapter Design
The HQ Elastic adapter design came about due to the need for reindexing Elasticsearch indexes in a way that is transparent to parts of HQ that write to Elasticsearch (e.g. pillowtop). Reindexing is necessary for making changes to index mappings, is a prerequisite to upgrading an Elasticsearch cluster, and is also needed for changing low-level index configurations (e.g. sharding).
There is an existing procedure draft that documents the steps that were used on
one occasion to reindex the case_search
index. This procedure leveraged a
custom pillow to “backfill” the cloned index (i.e. initially populated using
Elasticsearch Reindex API). That procedure only works for a subset of HQ
Elasticsearch indexes, and is too risky to be considered as an ongoing Elastic
maintenance strategy. There are several key constraints that an HQ reindexing
procedure should meet which the existing procedure does not:
simple and robust
performed with standard maintenance practices
provides the ability to test and verify the integrity of a new index before it is too late to be rejected
allows HQ Elasticsearch index state to remain decoupled from the commcare-cloud codebase
is not disruptive – does not prohibit any other kind of standard maintenance that might come up while the operation is underway
is “fire and forget” – does not require active polling of intermediate state in order to progress the overall operation
is practical for third party HQ hosters to use
One way to accomplish these constraints is to implement an “index multiplexing” feature in HQ, where Elasticsearch write operations are duplicated across two indexes. This design facilitates maintaining two up-to-date versions of any index (a primary read/write index and a secondary write-only index), allowing HQ to run in a “normal” state (i.e. not a custom “maintenance” state) while providing the ability to switch back and forth (swapping primary and secondary) before fully committing to abandoning one of them. Creating a copy of an index is the unavoidable nature of a reindex operation, and multiplexing allows safe switching from one to the other without causing disruptions or outages while keeping both up-to-date.
The least disruptive way to accomplish a multiplexing design is with an adapter
layer that operates between the low-level third party Elasticsearch Python
client library and high-level HQ components which need to read/write data in an
Elasticsearch index. HQ already has the initial framework for this layer (the
ElasticsearchInterface
class), so the adapter layer is not a new concept.
The reason that the ElasticsearchInterface
implementation cannot be modified
in-place to accommodate multiplexing is because it is the wrong level of
abstraction. The ElasticsearchInterface
abstraction layer was designed as an
Elasticsearch version abstraction. It provides a common set of functions and
methods so that the high-level HQ “consumer” that uses it can interact with
Elasticsearch documents without knowing which Elasticsearch version is on the
backend. It is below “index-level” logic, and does not implement index-specific
functionality needed in order for some indexes to be handled differently than
others (e.g. some indexes are indexed individually while others are
multiplexed). The document adapter implementation is a document abstraction
layer. It provides a common set of functions and methods to allow high-level HQ
code to perform Elasticsearch operations at the document level, allowing unique
adapters to handle their document operations differently from index to index.
With a multiplexing adapter layer, reindexing an Elasticsearch index can be as few as four concise steps, none of which are time-critical in respect to each other:
Merge and deploy a PR that configures multiplexing on an index.
Execute an idempotent management command that updates the secondary index from its primary counterpart.
Merge and deploy a PR that disables multiplexing for the index, (now using only the new index).
Execute a management command to delete the old index.
Note: the above steps are not limited to a single index at a time. That is, the implementation does not prohibit configuring multiplexing and reindexing multiple indexes at once.
This reindex procedure is inherently safe because:
At any point in the process, the rollback procedure is a simple code change (i.e. revert PR, deploy).
The operation responsible for populating the secondary index is idempotent and decoupled from the index configuration, allowing it to undergo change iterations without aborting the entire process (thereby losing reindex progress).
Instructions for third party hosters can follow the same process that Dimagi uses, which guarantees that any possible problems encountered by a third party hoster are not outside the Dimagi main track.
Design Details
Reindex Procedure Details
Configure multiplexing on an index by passing in
secondary
index name tocreate_document_adapter
.Ensure that there is a migration in place for creating the index (see Creating Elasticsearch Index Migrations above).
(Optional) If the reindex involves other meta-index changes (shards, mappings, etc), also update those configurations at this time.
Note Currently the Adapter will not support reindexing on specific environments but it would be compatible to accommodate it in future. This support will be added once we get to V5 of ES.
Configure
create_document_adapter
to return an instance ofElasticMultiplexAdapter
by passing insecondary
index name.case_adapter = create_document_adapter( ElasticCase, "hqcases_2016-03-04", "case", secondary="hqcase_2022-10-20" )
Add a migration which performs all cluster-level operations required for the new (secondary) index. For example:
creates the new index
configures shards, replicas, etc for the index
sets the index mapping
Review, merge and deploy this change. At Django startup, the new (secondary) index will automatically and immediately begin receiving document writes. Document reads will always come from the primary index.
Execute a management command to sync and verify the secondary index from the primary.
Note: This command is not yet implemented.
This management command is idempotent and performs four operations in serial. If any of the operations complete with unexpected results, the command will abort with an error.
Executes a Elastic
reindex
request with parameters to populate the secondary index from the primary, configured to not overwrite existing documents in the target (secondary) index.Polls the reindex task progress, blocking until complete.
Note: the reindex API also supports a “blocking” mode which may be advantageous due to limitations in Elasticsearch 2.4’s Task API. As such, this step 2. might be removed in favor of a blocking reindex during the 2.4 –> 5.x upgrade.
Performs a cleanup operation on the secondary index to remove tombstone documents.
Performs a verification pass to check integrity of the secondary index.
Note: An exact verification algorithm has not been designed, and complex verification operations may be left out of the first implementation. The reason it is outlined in this design is to identify that verification is supported and would happen at this point in the process. The initial implementation will at least implement feature-equivalency with the previous process (i.e. ensure document counts are equal between the two indexes), and tentatively an “equivalency check” of document
_id
’s (tentative because checking equality while the multiplexer is running is a race condition).
Example command (not yet implemented):
./manage.py elastic_sync_multiplexed ElasticBook
Perform a primary/secondary “swap” operation one or more times as desired to run a “live test” on the new (secondary) index while keeping the old (primary) index up-to-date.
Reconfigure the adapter by swapping the “primary” and “secondary” index names.
Add a migration that cleans up tombstone documents on the “new primary” index prior to startup.
Note: In theory, this step can be optional (e.g. if the sync procedure becomes sufficiently trusted in the future, or for “goldilox” indexes where rebuilding from source is feasible but advantageous to avoid, etc).
Disable multiplexing for the index.
Reconfigure the document adapter for the index by changing the “primary index name” to the value of the “secondary index name” and remove the secondary configuration (thus reverting the adapter back to a single-index adapter).
Add a migration that cleans up tombstone documents on the index.
Review, merge and deploy this change.
Execute a management command to delete the old index. Example:
./manage.py prune_elastic_index ElasticBook
Elastic Client Adapters
The corehq.apps.es.client
module encapsulates the CommCare HQ Elasticsearch
client adapters. It implements a high-level Elasticsearch client protocol
necessary to accomplish all interactions with the backend Elasticsearch cluster.
Client adapters are split into two usage patterns, the “Management Adapter” and
“Document Adapters”. Client adapters are instantiated at import time in order
to perform index verification when Django starts. Downstream code needing an
adapter import and use the adapter instance.
Management Adapter
There is only one management adapter, corehq.apps.es.client.manager
. This
adapter is used for performing all cluster management tasks such as creating and
updating indices and their mappings, changing index settings, changing cluster
settings, etc. This functionality is split into a separate class for a few
reasons:
The management adapter is responsible for low-level Elastic operations which document adapters should never be performing because the scope of a document adapter does not extend beyond a single index.
Elasticsearch 5+ implements security features which limit the kinds of operations a connection can be used for. The separation in these client adapter classes is designed to fit into that model.
from corehq.apps.es.client import manager
manager.index_create("books")
mapping = {"properties": {
"author": {"type": "text"},
"title": {"type": "text"},
"published": {"type": "date"},
}}
manager.index_put_mapping("books", "book", mapping)
manager.index_refresh("books")
manager.index_delete("books")
Document Adapters
Document adapter classes are defined on a per-index basis and include specific
properties and functionality necessary for maintaining a single type of “model”
document in a single index. Each index in Elasticsearch needs to have a
cooresponding ElasticDocumentAdapter
subclass which defines how the Python
model is applied to that specific index. At the very least, a document adapter
subclass must define the following:
A
mapping
which defines the structure and properties for documents managed by the adapter.A
from_python()
classmethod which can convert a Python model object into the JSON-serializable format for writing into the adapter’s index.
The combination of (index_name, type)
constrains the document adapter to
a specific HQ document mapping. Comparing an Elastic cluster to a Postgres
database (for the sake of analogy), the Elastic index is analogous to a
Postgres schema object (e.g. public
), and the _type
property is
analogous to a Postgres table object. The combination of both index name
and _type
fully constrains the properties that make up a specific Elastic
document.
Document adapters are instantiated once at runtime, via the
create_document_adapter()
function. The purpose of this function is to act
as a shim, returning an ElasticDocumentAdapter
instance or an
ElasticMultiplexAdapter
instance (see
Multiplexing Document Adapters below);
depending on whether or not a secondary index is defined by the secondary
keyword argument.
A simple example of a document model and its corresponding adapter:
class Book:
def __init__(self, isbn, author, title, published):
self.isbn = isbn
self.author = author
self.title = title
self.published = published
class ElasticBook(ElasticDocumentAdapter):
mapping = {"properties": {
"author": {"type": "text"},
"title": {"type": "text"},
"published": {"type": "date"},
}}
@classmethod
def from_python(cls, book):
source = {
"author": book.author,
"title": book.title,
"published": book.published,
}
return book.isbn, source
books_adapter = create_document_adapter(
ElasticBook,
index_name="books",
type_="book",
)
Using this adapter in practice might look as follows:
# index new
new_book = Book(
"978-1491946008",
"Luciano Ramalho",
"Fluent Python: Clear, Concise, and Effective Programming",
datetime.date(2015, 2, 10),
)
books_adapter.index(new_book)
# fetch existing
classic_book = books_adapter.get("978-0345391803")
Multiplexing Document Adapters
The ElasticMultiplexAdapter
is a wrapper around two
ElasticDocumentAdapter
instances: a primary and a secondary. The
multiplexing adapter provides the same public methods as a standard document
adapter, but it performs Elasticsearch write operations against both indexes in
order to keep them in step with document changes. The multiplexing adapter
provides the following functionality:
All read operations (
exists()
,get()
,search()
, etc) are always performed against the primary adapter only. Read requests are never performed against the secondary adapter.The
update()
write method always results in two sequential requests against the underlying indexes:An update request against the primary adapter that simultaneously fetches the full, post-update document body.
An upsert update request against the secondary adapter with the document returned in the primary update response.
All other write operations (
index()
,delete()
,bulk()
, etc) leverage the Elasticsearch Bulk API to perform the required operations against both indexes simultaneously in as few requests against the backend as possible (a single request in some cases).The
index()
method always achieves the index into both indexes with a single request.The
delete()
method attempts to perform the delete against both indexes in a single request, and will only perform a second request in order to index a tombstone on the secondary (if the primary delete succeeded and the secondary delete failed with a 404 status).The
bulk()
method (the underlying method for all bulk operations) performs actions against both indexes simultaneously by chunking the actions prior to callingelasticsearch.helpers.bulk()
(as opposed to relying on that function to perform the chunking). This allows all bulk actions to be applied against both the primary and secondary indexes in parallel, thereby keeping both indexes synchronized throughout the duration of potentially large (multi-request) bulk operations.
Tombstone
The concept of Tombstone in the ES mulitplexer is there to be placeholder for
the docs that get deleted on the primary index prior to that document being
indexed on the secondary index. It means that whenever an adapter is multiplexed
and a document is deleted, then the secondary index will receive a tombstone
entry for that document if and only if the primary index delete succeeds and
the secondary index delete fails due to a not found condition (404). The python
class defined to represent these tombstones is
corehq.apps.es.client.Tombstone
.
Scenario without tombstones: If a multiplexing adapter deletes a document in the secondary index (which turns out to be a no-op because the document does not exist there yet), and then that same document is copied to the secondary index by the reindexer, then it will exist indefinitely in the secondary even though it has been deleted in the primary.
Put another way:
Reindexer: gets batch of objects from primary index to copy to secondary.
Multiplexer: deletes a document in that batch (in both primary and secondary indexes).
Reindexer: writes deleted (now stale) document into secondary index.
Result: secondary index contains a document that has been deleted.
With tombstones: this will not happen because the reindexer uses a “ignore existing documents” copy mode, so it will never overwrite a tombstone with a stale (deleted) document.
Tombstones will only exist in the secondary index and will be deleted as a final step following a successful sync (reindex) operation. Since tombstones can only be created while the primary and secondary indexes are out of sync (secondary index does not yet contain all primary documents), then once the sync is complete, the multiplexer will no longer create new tombstones.
A sample tombstone document would look like
{
"__is_tombstone__" : True
}
Code Documentation
HQ Elasticsearch client logic (adapters).
- class corehq.apps.es.client.BaseAdapter[source]
Base adapter that includes methods common to all adapters.
- class corehq.apps.es.client.BulkActionItem(op_type, doc=None, doc_id=None)[source]
A wrapper for documents to be processed via Elasticsearch’s Bulk API. Collections of these objects can be passed to an ElasticDocumentAdapter’s
.bulk()
method for processing.Instances of this class are meant to be acquired via one of the factory methods rather than instantiating directly (via
__init__()
).- class OpType(value)
An enumeration.
- classmethod delete_id(doc_id)[source]
Factory method for a document delete action providing only the ID
- property is_delete
True
if this is a delete action, otherwiseFalse
.
- property is_index
True
if this is an index action, otherwiseFalse
.
- class corehq.apps.es.client.ElasticDocumentAdapter(index_name, type_)[source]
Base for subclassing document-specific adapters.
Subclasses must define the following:
mapping
: attribute (dict
)from_python(...)
: classmethod for converting models into Elastic format
- __init__(index_name, type_)[source]
A document adapter for a single index.
- Parameters:
index_name – the name of the index that this adapter interacts with
type – the index
_type
for the mapping
- bulk(actions, refresh=False, raise_errors=True)[source]
Use the Elasticsearch library’s
bulk()
helper function to process documents en masse.Equivalent to the legacy
ElasticsearchInterface.bulk_ops(...)
method.- Parameters:
actions – iterable of
BulkActionItem
instancesrefresh –
bool
refresh the effected shards to make this operation visible to searchraise_errors – whether or not exceptions should be raised if bulk actions fail. The default (
True
) matches that of the elasticsearch-py library’sbulk()
helper function (i.e. raise).
- bulk_delete(doc_ids, **bulk_kw)[source]
Convenience method for bulk deleting many documents by ID without the BulkActionItem boilerplate.
- Parameters:
doc_ids – iterable of document IDs to be deleted
bulk_kw – extra parameters passed verbatim to the
ElasticDocumentAdapter.bulk()
method.
- bulk_index(docs, **bulk_kw)[source]
Convenience method for bulk indexing many documents without the BulkActionItem boilerplate.
- Parameters:
docs – iterable of (Python model) documents to be indexed
bulk_kw – extra parameters passed verbatim to the
ElasticDocumentAdapter.bulk()
method.
- count(query)[source]
Return the number of documents matched by the
query
- Parameters:
query –
dict
query body- Returns:
int
- delete(doc_id, refresh=False)[source]
Delete an existing document from Elasticsearch
Equivalent to the legacy
ElasticsearchInterface.delete_doc(...)
method.- Parameters:
doc_id –
str
ID of the document to deleterefresh –
bool
refresh the effected shards to make this operation visible to search
- delete_tombstones()[source]
Deletes all tombstones documents present in the index
TODO: This should be replaced by delete_by_query https://www.elastic.co/guide/en/elasticsearch/reference/5.1/docs-delete-by-query.html when on ES version >= 5
- exists(doc_id)[source]
Check if a document exists for the provided
doc_id
Equivalent to the legacy
ElasticsearchInterface.doc_exists(...)
method.- Parameters:
doc_id –
str
ID of the document to be checked- Returns:
bool
- export_adapter()[source]
Get an instance of this document adapter configured for “export” queries (i.e. the low-level Elasticsearch client object is configured with longer request timeouts, etc).
- from_python(doc)[source]
Transform a Python model object or model dict into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters:
doc – document (instance of a Python model) or a dict representation of that model
- Returns:
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- get(doc_id, source_includes=None)[source]
Return the document for the provided
doc_id
Equivalent to the legacy
ElasticsearchInterface.get_doc(...)
method.- Parameters:
doc_id –
str
ID of the document to be fetchedsource_includes – a list of fields to extract and return. If
None
(the default), the entire document is returned.
- Returns:
dict
- get_docs(doc_ids)[source]
Return multiple docs for the provided
doc_ids
Equivalent to the legacy
ElasticsearchInterface.get_bulk_docs(...)
method.- Parameters:
doc_ids – iterable of document IDs (
str
’s)- Returns:
dict
- index(doc, refresh=False)[source]
Index (send) a new document in (to) Elasticsearch
Equivalent to the legacy
ElasticsearchInterface.index_doc(...)
method.- Parameters:
doc – the (Python model) document to index
refresh –
bool
refresh the effected shards to make this operation visible to search
- iter_docs(doc_ids, chunk_size=100)[source]
Return a generator which fetches documents in chunks.
- Parameters:
doc_ids – iterable of document IDs (
str
s)chunk_size –
int
number of documents to fetch per query
- Yields:
dict
documents
- scroll(query, scroll='5m', size=None)[source]
Perfrom a scrolling search, yielding each doc until the entire context is exhausted.
- Parameters:
query –
dict
raw search query.scroll –
str
time value specifying how long the Elastic cluster should keep the search context alive.size –
int
scroll size (number of documents per “scroll” page) When set toNone
(the default), the default scroll size is used.
- Yields:
dict
documents
- search(query, **kw)[source]
Perform a query (search) and return the result.
- Parameters:
query –
dict
search query to executekw – extra parameters passed directly to the underlying
elasticsearch.Elasticsearch.search()
method.
- Returns:
dict
- to_json(doc)[source]
Convenience method that returns the full “from python” document (including the
_id
key, if present) as it would be returned by an adaptersearch
result.This method is not used by the adapter itself, and is only present for other code which wishes to work with documents in a couch-like format.
- Parameters:
doc – document (instance of a Python model)
- update(doc_id, fields, return_doc=False, refresh=False, _upsert=False, retry_on_conflict=None)[source]
Update an existing document in Elasticsearch
Equivalent to the legacy
ElasticsearchInterface.update_doc_fields(...)
method.- Parameters:
doc_id –
str
ID of the document to updatefields –
dict
of name/values to update on the existing Elastic docreturn_doc –
bool
return the full updated doc. WhenFalse
(the default),None
is returned.refresh –
bool
refresh the effected shards to make this operation visible to search_upsert –
bool
. Only needed for multiplexing, use the index() method instead. Create a new document if one doesn’t already exist. WhenFalse
(the default), performing an update request for a missing document will raise an exception.retry_on_conflict –
int
number of times to retry the update if there is a conflict. Ignored ifNone
(the default). Otherwise, the value it is passed directly to the low-level update() method.
- Returns:
dict
orNone
- class corehq.apps.es.client.ElasticManageAdapter[source]
- cancel_task(task_id)[source]
Cancells a running task in ES
- Parameters:
task_id –
str
ID of the task- Returns:
dict
of task details- Raises:
TaskError
orTaskMissing
(subclass ofTaskError
)
- cluster_routing(*, enabled)[source]
Enable or disable cluster routing.
- Parameters:
enabled –
bool
whether to enable or disable routing
- get_aliases()[source]
Return the cluster aliases information.
- Returns:
dict
with format{<alias>: [<index>, ...], ...}
- get_node_info(node_id, metric)[source]
Return a specific metric from the node info for an Elasticsearch node.
- Parameters:
node_id –
str
ID of the nodemetric –
str
name of the metric to fetch
- Returns:
deserialized JSON (
dict
,list
,str
, etc)
- get_task(task_id)[source]
Return the details for an active task
- Parameters:
task_id –
str
ID of the task- Returns:
dict
of task details- Raises:
TaskError
orTaskMissing
(subclass ofTaskError
)
- index_configure_for_reindex(index)[source]
Update an index with settings optimized for reindexing.
- Parameters:
index –
str
index for which to change the settings
- index_configure_for_standard_ops(index)[source]
Update an index with settings optimized standard HQ performance.
- Parameters:
index –
str
index for which to change the settings
- index_create(index, metadata=None)[source]
Create a new index.
- Parameters:
index –
str
index namemetadata –
dict
full index metadata (mappings, settings, etc)
- index_exists(index)[source]
Check if
index
refers to a valid index identifier (index name or alias).- Parameters:
index –
str
index name or alias- Returns:
bool
- index_get_mapping(index, type_)[source]
Returns the current mapping for a doc type on an index.
- Parameters:
index –
str
index to fetch the mapping fromtype –
str
doc type to fetch the mapping for
- Returns:
mapping
dict
orNone
if index does not have a mapping
- index_get_settings(index, values=None)[source]
Returns the current settings for an index.
- Parameters:
index –
str
index to fetch settings forvalues – Optional collection of explicit settings to provide in the return value. If
None
(the default) all settings are returned.
- Returns:
dict
- Raises:
KeyError
(only if invalidvalues
are provided)
- index_put_alias(index, name)[source]
Assign an alias to an existing index. This uses the
Elasticsearch.update_aliases()
method to perform both ‘remove’ and ‘add’ actions simultaneously, which is atomic on the server-side. This ensures that the alias is only assigned to one index at a time, and that (if present) an existing alias does not vanish momentarily.See: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html
- Parameters:
index –
str
name of the index to be aliasedname –
str
name of the alias to assign toindex
- index_put_mapping(index, type_, mapping)[source]
Update the mapping for a doc type on an index.
- Parameters:
index –
str
index where the mapping should be updatedtype –
str
doc type to update on the indexmapping –
dict
mapping for the provided doc type
- index_set_replicas(index, replicas)[source]
Set the number of replicas for an index.
- Parameters:
index –
str
index for which to change the replicasreplicas –
int
number of replicas
- index_validate_query(index, query, params={})[source]
Returns True if passed query is valid else will return false
- indices_info()[source]
Retrieve meta information about all the indices in the cluster. This will also return closed indices :returns:
dict
A dict with index name in keys and index meta information.
- indices_refresh(indices)[source]
Refresh a list of indices.
- Parameters:
indices – iterable of index names or aliases
- reindex(source, dest, wait_for_completion=False, refresh=False, batch_size=1000, requests_per_second=None, copy_doc_ids=True, query=None)[source]
Starts the reindex process in elastic search cluster
- Parameters:
source –
str
name of the source indexdest –
str
name of the destination indexwait_for_completion –
bool
would block the request until reindex is completerefresh –
bool
refreshes indexrequests_per_second –
int
throttles rate at which reindex issues batches of index operations by padding each batch with a wait time.batch_size –
int
The size of the scroll batch used by the reindex process. larger batches may process more quickly but risk errors if the documents are too large. 1000 is the recommended maximum and elasticsearch default, and can be reduced if you encounter scroll timeouts.query –
dict
optional parameter to include a term query to filter which documents are included in the reindex
- Returns:
None if wait_for_completion is True else would return task_id of reindex task
- class corehq.apps.es.client.ElasticMultiplexAdapter(primary_adapter, secondary_adapter)[source]
-
- bulk(actions, refresh=False, raise_errors=True)[source]
Apply bulk actions on the primary and secondary.
Bulk actions are applied against the primary and secondary in chunks of 500 actions at a time (replicates the behavior of the the
bulk()
helper function). Chunks are applied against the primary and secondary simultaneously by chunking the originalactions
in blocks of (up to) 250 and performing a single block of (up to) 500 actions against both indexes in parallel. Tombstone documents are indexed on the secondary for anydelete
actions which succeed on the primary but fail on the secondary.
- delete(doc_id, refresh=False)[source]
Delete from both primary and secondary via the
bulk()
method in order to perform both actions in a single HTTP request (two, if a tombstone is required).
- class corehq.apps.es.client.Tombstone(doc_id)[source]
Used to create Tombstone documents in the secondary index when the document from primary index is deleted.
This is required to avoid a potential race condition that might ocuur when we run reindex process along with the multiplexer
- corehq.apps.es.client.create_document_adapter(cls, index_name, type_, *, secondary=None)[source]
Creates and returns a document adapter instance for the parameters provided.
One thing to note here is that the behaviour of the function can be altered with django settings.
The function would return multiplexed adapter only if - ES_<app name>_INDEX_MULTIPLEXED is True - Secondary index is provided.
The indexes would be swapped only if - ES_<app_name>_INDEX_SWAPPED is set to True - secondary index is provided
If both ES_<app name>_INDEX_MULTIPLEXED and ES_<app_name>_INDEX_SWAPPED are set to True then primary index will act as secondary index and vice versa.
- Parameters:
cls – an
ElasticDocumentAdapter
subclassindex_name – the name of the index that the adapter interacts with
type – the index
_type
for the adapter’s mapping.secondary – the name of the secondary index in a multiplexing configuration. If an index name is provided and ES_<app name>_INDEX_MULTIPLEXED is set to True, then returned adapter will be an instance of
ElasticMultiplexAdapter
. IfNone
(the default), the returned adapter will be an instance ofcls
. ES_<app name>_INDEX_MULTIPLEXED will be ignored if secondary is None.
- Returns:
a document adapter instance.
Querying Elasticsearch
ESQuery
ESQuery is a library for building elasticsearch queries in a friendly, more readable manner.
Basic usage
There should be a file and subclass of ESQuery for each index we have.
Each method returns a new object, so you can chain calls together like SQLAlchemy. Here’s an example usage:
q = (FormsES()
.domain(self.domain)
.xmlns(self.xmlns)
.submitted(gte=self.datespan.startdate_param,
lt=self.datespan.enddateparam)
.source(['xmlns', 'domain', 'app_id'])
.sort('received_on', desc=False)
.size(self.pagination.count)
.start(self.pagination.start)
.terms_aggregation('babies.count', 'babies_saved'))
result = q.run()
total_docs = result.total
hits = result.hits
Generally useful filters and queries should be abstracted away for re-use, but you can always add your own like so:
q.filter({"some_arbitrary_filter": {...}})
q.set_query({"fancy_query": {...}})
For debugging or more helpful error messages, you can use query.dumps()
and query.pprint()
, both of which use json.dumps()
and are suitable for
pasting in to ES Head or Marvel or whatever
Filtering
Filters are implemented as standalone functions, so they can be composed and
nested q.OR(web_users(), mobile_users())
.
Filters can be passed to the query.filter
method: q.filter(web_users())
There is some syntactic sugar that lets you skip this boilerplate and just
call the filter as if it were a method on the query class: q.web_users()
In order to be available for this shorthand, filters are added to the
builtin_filters
property of the main query class.
I know that’s a bit confusing, but it seemed like the best way to make filters
available in both contexts.
Generic filters applicable to all indices are available in
corehq.apps.es.filters
. (But most/all can also be accessed as a query
method, if appropriate)
Filtering Specific Indices
There is a file for each elasticsearch index (if not, feel free to add one). This file provides filters specific to that index, as well as an appropriately-directed ESQuery subclass with references to these filters.
These index-specific query classes also have default filters to exclude things
like inactive users or deleted docs.
These things should nearly always be excluded, but if necessary, you can remove
these with remove_default_filters
.
Language
es_query - the entire query, filters, query, pagination
filters - a list of the individual filters
query - the query, used for searching, not filtering
field - a field on the document. User docs have a ‘domain’ field.
lt/gt - less/greater than
lte/gte - less/greater than or equal to
- class corehq.apps.es.es_query.ESQuery(index=None, for_export=False)[source]
This query builder only outputs the following query structure:
{ "query": { "bool": { "filter": { "and": [ <filters> ] }, "query": <query> } }, <size, sort, other params> }
- __init__(index=None, for_export=False)[source]
- add_query(new_query, clause)[source]
Add a query to the current list of queries
- aggregation(aggregation)[source]
Add the passed-in aggregation to the query
- property builtin_filters
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- dumps(pretty=False)[source]
Returns the JSON query that will be sent to elasticsearch.
- exclude_source()[source]
Turn off _source retrieval. Mostly useful if you just want the doc_ids
- fields(fields)[source]
Restrict the fields returned from elasticsearch
Deprecated. Use source instead.
- filter(filter)[source]
Add the passed-in filter to the query. All filtering goes through this class.
- property filters
Return a list of the filters used in this query, suitable if you want to reproduce a query with additional filtering.
- get_ids()[source]
Performs a minimal query to get the ids of the matching documents
For very large sets of IDs, use
scroll_ids
instead
- nested_sort(path, field_name, nested_filter, desc=False, reset_sort=True, sort_missing=None)[source]
Order results by the value of a nested field
- pprint()[source]
pretty prints the JSON query that will be sent to elasticsearch.
- remove_default_filter(default)[source]
Remove a specific default filter by passing in its name.
- remove_default_filters()[source]
Sensible defaults are provided. Use this if you don’t want ‘em
- run()[source]
Actually run the query. Returns an ESQuerySet object.
- scroll()[source]
Run the query against the scroll api. Returns an iterator yielding each document that matches the query.
- scroll_ids()[source]
Returns a generator of all matching ids
- scroll_ids_to_disk_and_iter_docs()[source]
Returns a
ScanResult
for all matched documents.Used for iterating docs for a very large query where consuming the docs via
self.scroll()
may exceed the amount of time that the scroll context can remain open. This is achieved by:Fetching the IDs for all matched documents (via
scroll_ids()
) and caching them in a temporary file on disk, thenfetching the documents by (chunked blocks of) IDs streamed from the temporary file.
Original design PR: https://github.com/dimagi/commcare-hq/pull/20282
Caveats: - There is no guarantee that the returned ScanResult’s
count
property will match the number of yielded docs. - Documents that are present whenscroll_ids()
is called, but are deleted prior to being fetched in full will be missing from the results, and this scenario will not raise an exception. - If Elastic document ID values are ever reused (i.e. new documents are created with the same ID of a previously-deleted document) then this method would become unsafe because it could yield documents that were not matched by the query.
- search_string_query(search_string, default_fields)[source]
Accepts a user-defined search string
- set_query(query)[source]
Set the query. Most stuff we want is better done with filters, but if you actually want Levenshtein distance or prefix querying…
- set_sorting_block(sorting_block)[source]
To be used with get_sorting_block, which interprets datatables sorting
- size(size)[source]
Restrict number of results returned. Analagous to SQL limit, except when performing a scroll, in which case this value becomes the number of results to fetch per scroll request.
- sort(field, desc=False, reset_sort=True)[source]
Order the results by field.
- source(include, exclude=None)[source]
Restrict the output of _source in the queryset. This can be used to return an object in a queryset
- start(start)[source]
Pagination. Analagous to SQL offset.
- values(*fields)[source]
modeled after django’s QuerySet.values
- class corehq.apps.es.es_query.ESQuerySet(raw, query)[source]
- The object returned from
ESQuery.run
ESQuerySet.raw is the raw response from elasticsearch
ESQuerySet.query is the ESQuery object
- __init__(raw, query)[source]
- property doc_ids
Return just the docs ids from the response.
- property hits
Return the docs from the response.
- static normalize_result(query, result)[source]
Return the doc from an item in the query response.
- property total
Return the total number of docs matching the query.
- The object returned from
- class corehq.apps.es.es_query.HQESQuery(index=None, for_export=False)[source]
Query logic specific to CommCare HQ
- property builtin_filters
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- exception corehq.apps.es.es_query.InvalidQueryError[source]
Query parameters cannot be assembled into a valid search.
Available Filters
The following filters are available on any ESQuery instance - you can chain any of these on your query.
Note also that the term
filter accepts either a list or a single element.
Simple filters which match against a field are based on this filter, so those
will also accept lists.
That means you can do form_query.xmlns(XMLNS1)
or
form_query.xmlns([XMLNS1, XMLNS2, ...])
.
Contributing:
Additions to this file should be added to the builtin_filters
method on
either ESQuery or HQESQuery, as appropriate (is it an HQ thing?).
- corehq.apps.es.filters.AND(*filters)[source]
Filter docs to match all of the filters passed in
- corehq.apps.es.filters.NOT(filter_)[source]
Exclude docs matching the filter passed in
- corehq.apps.es.filters.OR(*filters)[source]
Filter docs to match any of the filters passed in
- corehq.apps.es.filters.date_range(field, gt=None, gte=None, lt=None, lte=None)[source]
Range filter that accepts date and datetime objects as arguments
- corehq.apps.es.filters.doc_id(doc_id)[source]
Filter by doc_id. Also accepts a list of doc ids
- corehq.apps.es.filters.doc_type(doc_type)[source]
Filter by doc_type. Also accepts a list
- corehq.apps.es.filters.domain(domain_name)[source]
Filter by domain.
- corehq.apps.es.filters.empty(field)[source]
Only return docs with a missing or null value for
field
- corehq.apps.es.filters.exists(field)[source]
Only return docs which have a value for
field
- corehq.apps.es.filters.geo_bounding_box(field, top_left, bottom_right)[source]
Only return geopoints stored in
field
that are located within the bounding box defined bytop_left
andbottom_right
.top_left
andbottom_right
accept a range of data types and formats.More info: Geo Bounding Box Query
- corehq.apps.es.filters.geo_grid(field, geohash)[source]
Filters cases by the geohash grid cell in which they are located.
- corehq.apps.es.filters.geo_polygon(field, points)[source]
Filters
geo_point
values infield
that fall within the polygon described by the list ofpoints
.More info: Geo Polygon Query
- Parameters:
field – A field with Elasticsearch data type
geo_point
.points – A list of points that describe a polygon. Elasticsearch supports a range of formats for list items.
- Returns:
A filter dict.
- corehq.apps.es.filters.geo_shape(field, shape, relation='intersects')[source]
Filters cases by case properties indexed using the
geo_point
type.More info: The Geoshape query reference
- Parameters:
field – The field where geopoints are stored
shape – A shape definition given in GeoJSON geometry format. More info: The GeoJSON specification (RFC 7946)
relation – The relation between the shape and the case property values.
- Returns:
A filter definition
- corehq.apps.es.filters.missing(field)[source]
Only return docs missing a value for
field
- corehq.apps.es.filters.nested(path, filter_)[source]
Query nested documents which normally can’t be queried directly
- corehq.apps.es.filters.non_null(field)[source]
Only return docs with a real, non-null value for
field
- corehq.apps.es.filters.range_filter(field, gt=None, gte=None, lt=None, lte=None)[source]
Filter
field
by a range. Pass in some sensible combination ofgt
(greater than),gte
(greater than or equal to),lt
, andlte
.
- corehq.apps.es.filters.term(field, value)[source]
Filter docs by a field ‘value’ can be a singleton or a list.
Available Queries
Queries are used for actual searching - things like relevancy scores, Levenstein distance, and partial matches.
View the elasticsearch documentation to see what other options are available, and put ‘em here if you end up using any of ‘em.
- corehq.apps.es.queries.filtered(query, filter_)[source]
Filtered query for performing both filtering and querying at once
- corehq.apps.es.queries.geo_distance(field, geopoint, **kwargs)[source]
Filters cases to those within a certain distance of the provided geopoint
eg: geo_distance(‘gps_location’, GeoPoint(-33.1, 151.8), kilometers=100)
- corehq.apps.es.queries.match_all()[source]
No-op query used because a default must be specified
- corehq.apps.es.queries.nested(path, query, *args, **kwargs)[source]
Creates a nested query for use with nested documents
Keyword arguments such as score_mode and others can be added.
- corehq.apps.es.queries.nested_filter(path, filter_, *args, **kwargs)[source]
Creates a nested query for use with nested documents
Keyword arguments such as score_mode and others can be added.
- corehq.apps.es.queries.search_string_query(search_string, default_fields)[source]
All input defaults to doing an infix search for each term. (This may later change to some kind of fuzzy matching).
This is also available via the main ESQuery class.
Aggregate Queries
Aggregations are a replacement for Facets
Here is an example used to calculate how many new pregnancy cases each user has opened in a certain date range.
res = (CaseES()
.domain(self.domain)
.case_type('pregnancy')
.date_range('opened_on', gte=startdate, lte=enddate))
.aggregation(TermsAggregation('by_user', 'opened_by')
.size(0)
buckets = res.aggregations.by_user.buckets
buckets.user1.doc_count
There’s a bit of magic happening here - you can access the raw json data from
this aggregation via res.aggregation('by_user')
if you’d prefer to skip it.
The res
object has a aggregations
property, which returns a namedtuple
pointing to the wrapped aggregation results. The name provided at instantiation is
used here (by_user
in this example).
The wrapped aggregation_result
object has a result
property containing the
aggregation data, as well as utilties for parsing that data into something more
useful. For example, the TermsAggregation
result also has a counts_by_bucket
method that returns a {bucket: count}
dictionary, which is normally what you
want.
As of this writing, there’s not much else developed, but it’s pretty easy to add support for other aggregation types and more results processing
- class corehq.apps.es.aggregations.AggregationRange(start=None, end=None, key=None)[source]
Note that a range includes the “start” value and excludes the “end” value. i.e. start <= X < end
- Parameters:
start – range start
end – range end
key – optional key name for the range
- class corehq.apps.es.aggregations.AggregationTerm(name, field)
- field
Alias for field number 1
- name
Alias for field number 0
- class corehq.apps.es.aggregations.AvgAggregation(name, field)[source]
- class corehq.apps.es.aggregations.CardinalityAggregation(name, field)[source]
- class corehq.apps.es.aggregations.DateHistogram(name, datefield, interval, timezone=None)[source]
Aggregate by date range. This can answer questions like “how many forms were created each day?”.
- Parameters:
name – what do you want to call this aggregation
datefield – the document’s date field to look at
interval – the date interval to use - from DateHistogram.Interval
timezone – do bucketing using this time zone instead of UTC
- __init__(name, datefield, interval, timezone=None)[source]
- class corehq.apps.es.aggregations.ExtendedStatsAggregation(name, field, script=None)[source]
Extended stats aggregation that computes an extended stats aggregation by field
- class corehq.apps.es.aggregations.FilterAggregation(name, filter)[source]
Bucket aggregation that creates a single bucket for the specified filter
- Parameters:
name – aggregation name
filter – filter body
- __init__(name, filter)[source]
- class corehq.apps.es.aggregations.FiltersAggregation(name, filters=None)[source]
Bucket aggregation that creates a bucket for each filter specified using the filter name.
- Parameters:
name – aggregation name
- __init__(name, filters=None)[source]
- add_filter(name, filter)[source]
- Parameters:
name – filter name
filter – filter body
- class corehq.apps.es.aggregations.GeoBoundsAggregation(name, field)[source]
A metric aggregation that computes the bounding box containing all geo_point values for a field.
More info: Geo Bounds Aggregation
- __init__(name, field)[source]
- class corehq.apps.es.aggregations.GeohashGridAggregation(name, field, precision)[source]
A multi-bucket aggregation that groups
geo_point
andgeo_shape
values into buckets that represent a grid.More info: Geohash grid aggregation
- __init__(name, field, precision)[source]
Initialize a GeohashGridAggregation
- Parameters:
name – The name of this aggregation
field – The case property that stores a geopoint
precision – A value between 1 and 12
High precision geohashes have a long string length and represent cells that cover only a small area (similar to long-format ZIP codes like “02139-4075”).
Low precision geohashes have a short string length and represent cells that each cover a large area (similar to short-format ZIP codes like “02139”).
- class corehq.apps.es.aggregations.MaxAggregation(name, field)[source]
- class corehq.apps.es.aggregations.MinAggregation(name, field)[source]
Bucket aggregation that returns the minumum value of a field
- Parameters:
name – aggregation name
field – name of the field to min
- class corehq.apps.es.aggregations.MissingAggregation(name, field)[source]
A field data based single bucket aggregation, that creates a bucket of all documents in the current document set context that are missing a field value (effectively, missing a field or having the configured NULL value set).
- Parameters:
name – aggregation name
field – name of the field to bucket on
- __init__(name, field)[source]
- class corehq.apps.es.aggregations.NestedAggregation(name, path)[source]
A special single bucket aggregation that enables aggregating nested documents.
- Parameters:
path – Path to nested document
- __init__(name, path)[source]
- class corehq.apps.es.aggregations.NestedTermAggregationsHelper(base_query, terms)[source]
Helper to run nested term-based queries (equivalent to SQL group-by clauses). This is not at all related to the ES ‘nested aggregation’. The final aggregation is a count of documents.
Example usage:
# counting all forms submitted in a domain grouped by app id and user id NestedTermAggregationsHelper( base_query=FormES().domain(domain_name), terms=[ AggregationTerm('app_id', 'app_id'), AggregationTerm('user_id', 'form.meta.userID'), ] ).get_data()
This works by bucketing docs first by one terms aggregation, then within that bucket, bucketing further by the next term, and so on. This is then flattened out to appear like a group-by-multiple.
- __init__(base_query, terms)[source]
- class corehq.apps.es.aggregations.RangeAggregation(name, field, ranges=None, keyed=True)[source]
Bucket aggregation that creates one bucket for each range :param name: the aggregation name :param field: the field to perform the range aggregations on :param ranges: list of AggregationRange objects :param keyed: set to True to have the results returned by key instead of as a list (see RangeResult.normalized_buckets)
- __init__(name, field, ranges=None, keyed=True)[source]
- class corehq.apps.es.aggregations.StatsAggregation(name, field, script=None)[source]
Stats aggregation that computes a stats aggregation by field
- Parameters:
name – aggregation name
field – name of the field to collect stats on
script – an optional field to allow you to script the computed field
- __init__(name, field, script=None)[source]
- class corehq.apps.es.aggregations.SumAggregation(name, field)[source]
Bucket aggregation that sums a field
- Parameters:
name – aggregation name
field – name of the field to sum
- __init__(name, field)[source]
- class corehq.apps.es.aggregations.TermsAggregation(name, field, size=None, missing=None)[source]
Bucket aggregation that aggregates by field
- Parameters:
name – aggregation name
field – name of the field to bucket on
size –
missing – define how documents that are missing a value should be treated. By default, they will be ignored. If a value is supplied here it will be used where the value is missing.
- __init__(name, field, size=None, missing=None)[source]
- class corehq.apps.es.aggregations.TopHitsAggregation(name, field=None, is_ascending=True, size=1, include=None)[source]
A top_hits metric aggregator keeps track of the most relevant document being aggregated This aggregator is intended to be used as a sub aggregator, so that the top matching documents can be aggregated per bucket.
- Parameters:
name – Aggregation name
field – This is the field to sort the top hits by. If None, defaults to sorting by score.
is_ascending – Whether to sort the hits in ascending or descending order.
size – The number of hits to include. Defaults to 1.
include – An array of fields to include in the hit. Defaults to returning the whole document.
- __init__(name, field=None, is_ascending=True, size=1, include=None)[source]
- class corehq.apps.es.aggregations.ValueCountAggregation(name, field)[source]
AppES
- class corehq.apps.es.apps.AppES(index=None, for_export=False)[source]
- property builtin_filters
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- index = 'apps'
- class corehq.apps.es.apps.ElasticApp(index_name, type_)[source]
- canonical_name = 'apps'
- property mapping
- property model_cls
- settings_key = 'hqapps'
- corehq.apps.es.apps.app_id(app_id)[source]
- corehq.apps.es.apps.build_comment(comment)[source]
- corehq.apps.es.apps.cloudcare_enabled(cloudcare_enabled)[source]
- corehq.apps.es.apps.created_from_template(from_template=True)[source]
- corehq.apps.es.apps.is_build(build=True)[source]
- corehq.apps.es.apps.is_released(released=True)[source]
- corehq.apps.es.apps.uses_case_sharing(case_sharing=True)[source]
- corehq.apps.es.apps.version(version)[source]
UserES
Here’s an example adapted from the case list report - it gets a list of the ids of all unknown users, web users, and demo users on a domain.
from corehq.apps.es import users as user_es
user_filters = [
user_es.unknown_users(),
user_es.web_users(),
user_es.demo_users(),
]
query = (user_es.UserES()
.domain(self.domain)
.OR(*user_filters)
.show_inactive())
owner_ids = query.get_ids()
- class corehq.apps.es.users.ElasticUser(index_name, type_)[source]
- canonical_name = 'users'
- property mapping
- property model_cls
- settings_key = 'hqusers'
- class corehq.apps.es.users.UserES(index=None, for_export=False)[source]
- property builtin_filters
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- default_filters = {'active': {'term': {'is_active': True}}, 'not_deleted': {'term': {'base_doc': 'couchuser'}}}
- index = 'users'
- show_inactive()[source]
Include inactive users, which would normally be filtered out.
- show_only_inactive()[source]
- corehq.apps.es.users.admin_users()[source]
Return only AdminUsers. Admin users are mock users created from xform submissions with unknown user ids whose username is “admin”.
- corehq.apps.es.users.analytics_enabled(enabled=True)[source]
- corehq.apps.es.users.created(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.users.demo_users()[source]
Matches users whose username is demo_user
- corehq.apps.es.users.domain(domain, allow_enterprise=False)[source]
- corehq.apps.es.users.domains(domains)[source]
- corehq.apps.es.users.is_active(active=True)[source]
- corehq.apps.es.users.is_practice_user(practice_mode=True)[source]
- corehq.apps.es.users.last_logged_in(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.users.location(location_id)[source]
- corehq.apps.es.users.login_as_user(value)[source]
- corehq.apps.es.users.missing_or_empty_user_data_property(property_name)[source]
A user_data property doesn’t exist, or does exist but has an empty string value.
- corehq.apps.es.users.mobile_users()[source]
- corehq.apps.es.users.query_user_data(key, value)[source]
- corehq.apps.es.users.role_id(role_id)[source]
- corehq.apps.es.users.unknown_users()[source]
Return only UnknownUsers. Unknown users are mock users created from xform submissions with unknown user ids.
- corehq.apps.es.users.user_ids(user_ids)[source]
- corehq.apps.es.users.username(username)[source]
- corehq.apps.es.users.web_users()[source]
CaseES
Here’s an example getting pregnancy cases that are either still open or were closed after May 1st.
from corehq.apps.es import cases as case_es
q = (case_es.CaseES()
.domain('testproject')
.case_type('pregnancy')
.OR(case_es.is_closed(False),
case_es.closed_range(gte=datetime.date(2015, 05, 01))))
- class corehq.apps.es.cases.CaseES(index=None, for_export=False)[source]
- property builtin_filters
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- index = 'cases'
- class corehq.apps.es.cases.ElasticCase(index_name, type_)[source]
- canonical_name = 'cases'
- property mapping
- property model_cls
- settings_key = 'hqcases'
- corehq.apps.es.cases.active_in_range(gt=None, gte=None, lt=None, lte=None)[source]
Restricts cases returned to those with actions during the range
- corehq.apps.es.cases.case_ids(case_ids)[source]
- corehq.apps.es.cases.case_name(name)[source]
- corehq.apps.es.cases.case_type(type_)[source]
- corehq.apps.es.cases.closed_range(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.cases.is_closed(closed=True)[source]
- corehq.apps.es.cases.modified_range(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.cases.open_case_aggregation(name='open_case', gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.cases.opened_by(user_id)[source]
- corehq.apps.es.cases.opened_range(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.cases.owner(owner_id)[source]
- corehq.apps.es.cases.owner_type(owner_type)[source]
- corehq.apps.es.cases.server_modified_range(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.cases.touched_total_aggregation(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.cases.user(user_id)[source]
- corehq.apps.es.cases.user_ids_handle_unknown(user_ids)[source]
FormES
- class corehq.apps.es.forms.ElasticForm(index_name, type_)[source]
- canonical_name = 'forms'
- property mapping
- property model_cls
- settings_key = 'xforms'
- class corehq.apps.es.forms.FormES(index=None, for_export=False)[source]
- property builtin_filters
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- default_filters = {'has_domain': {'exists': {'field': 'domain'}}, 'has_user': {'exists': {'field': 'form.meta.userID'}}, 'has_xmlns': {'exists': {'field': 'xmlns'}}, 'is_xform_instance': {'term': {'doc_type': 'xforminstance'}}}
- domain_aggregation()[source]
- index = 'forms'
- only_archived()[source]
Include only archived forms, which are normally excluded
- user_aggregation()[source]
- corehq.apps.es.forms.app(app_ids)[source]
- corehq.apps.es.forms.completed(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.forms.form_ids(form_ids)[source]
- corehq.apps.es.forms.submitted(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.forms.updating_cases(case_ids)[source]
return only those forms that have case blocks that touch the cases listed in case_ids
- corehq.apps.es.forms.user_id(user_ids)[source]
- corehq.apps.es.forms.user_ids_handle_unknown(user_ids)[source]
- corehq.apps.es.forms.user_type(user_types)[source]
- corehq.apps.es.forms.xmlns(xmlnss)[source]
DomainES
from corehq.apps.es import DomainES
query = (DomainES()
.in_domains(domains)
.created(gte=datespan.startdate, lte=datespan.enddate)
.size(0))
- class corehq.apps.es.domains.DomainES(index=None, for_export=False)[source]
- property builtin_filters
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- default_filters = {'not_snapshot': {'bool': {'must_not': {'term': {'is_snapshot': True}}}}}
- index = 'domains'
- only_snapshots()[source]
Normally snapshots are excluded, instead, return only snapshots
- class corehq.apps.es.domains.ElasticDomain(index_name, type_)[source]
- analysis = {'analyzer': {'comma': {'pattern': '\\s*,\\s*', 'type': 'pattern'}, 'default': {'filter': ['lowercase'], 'tokenizer': 'whitespace', 'type': 'custom'}}}
- canonical_name = 'domains'
- property mapping
- property model_cls
- settings_key = 'hqdomains'
- corehq.apps.es.domains.created(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.domains.created_by_user(creating_user)[source]
- corehq.apps.es.domains.in_domains(domains)[source]
- corehq.apps.es.domains.incomplete_domains()[source]
- corehq.apps.es.domains.is_active(is_active=True)[source]
- corehq.apps.es.domains.is_active_project(is_active=True)[source]
- corehq.apps.es.domains.last_modified(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.domains.non_test_domains()[source]
- corehq.apps.es.domains.real_domains()[source]
- corehq.apps.es.domains.self_started()[source]
SMSES
- class corehq.apps.es.sms.ElasticSMS(index_name, type_)[source]
- canonical_name = 'sms'
- property mapping
- property model_cls
- settings_key = 'smslogs'
- class corehq.apps.es.sms.SMSES(index=None, for_export=False)[source]
- property builtin_filters
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- index = 'sms'
- user_aggregation()[source]
- corehq.apps.es.sms.direction(direction_)[source]
- corehq.apps.es.sms.incoming_messages()[source]
- corehq.apps.es.sms.outgoing_messages()[source]
- corehq.apps.es.sms.processed(processed=True)[source]
- corehq.apps.es.sms.processed_or_incoming_messages()[source]
- corehq.apps.es.sms.received(gt=None, gte=None, lt=None, lte=None)[source]
- corehq.apps.es.sms.to_commcare_case()[source]
- corehq.apps.es.sms.to_commcare_user()[source]
- corehq.apps.es.sms.to_commcare_user_or_case()[source]
- corehq.apps.es.sms.to_couch_user()[source]
- corehq.apps.es.sms.to_web_user()[source]