Elasticsearch¶
Overview¶
Indexes¶
- We have indexes for each of the following doc types:
Applications -
hqapps
Cases -
hqcases
Domains -
hqdomains
Forms -
xforms
Groups -
hqgroups
Users -
hqusers
Report Cases -
report_cases
Report Forms -
report_xforms
SMS logs -
smslogs
Case Search -
case_search
The Report cases and forms indexes are only configured to run for a few domains, and they store additional mappings allowing you to query on form and case properties (not just metadata).
Each index has a corresponding mapping file in corehq/pillows/mappings/
.
Each mapping has a hash that reflects the current state of the mapping. This
can just be a random alphanumeric string.
The hash is appended to the index name so the index is called something like
xforms_1cce1f049a1b4d864c9c25dc42648a45
. Each type of index has an alias
with the short name, so you should normally be querying just xforms
, not
the fully specified index+hash. All of HQ code except the index maintenance
code uses aliases to read and write data to indices.
Whenever the mapping is changed, this hash should be updated. That will
trigger the creation of a new index on deploy (by the $ ./manage.py
ptop_preindex
command). Once the new index is finished, the alias is
flipped ($ ./manage.py ptop_es_manage --flip_all_aliases
) to point
to the new index, allowing for a relatively seamless transition.
Keeping indexes up-to-date¶
Pillowtop looks at the changes feed from couch and listens for any relevant new/changed docs. In order to have your changes appear in elasticsearch, pillowtop must be running:
$ ./manage.py run_ptop --all
You can also run a once-off reindex for a specific index:
$ ./manage.py ptop_reindexer_v2 user
Changing a mapping or adding data¶
If you’re adding additional data to elasticsearch, you’ll need modify that index’s mapping file in order to be able to query on that new data.
Adding data to an index¶
Each pillow has a function or class that takes in the raw document dictionary
and transforms it into the document that get’s sent to ES. If for example,
you wanted to store username in addition to user_id on cases in elastic,
you’d add username
to corehq.pillows.mappings.case_mapping
, then
modify transform_case_for_elasticsearch
function to do the
appropriate lookup. It accepts a doc_dict
for the case doc and is
expected to return a doc_dict
, so just add the username
to that.
Building the new index¶
Once you’ve made the change, you’ll need to build a new index which uses
that new mapping. Updating index name in the mapping file triggers HQ to
create the new index with new mapping and reindex all data, so you’ll
have to update the index hash and alias at the top of the mapping file.
The hash suffix to the index can just be a random alphanumeric string and
is usually the date of the edit by convention. The alias should also be updated
to a new one of format xforms_<date-modified>
(the date is just by convention), so that
production operations continue to use the old alias pointing to existing index.
This will trigger a preindex as outlined in the Indexes section. In subsequent commits
alias can be flipped back to what it was, for example xforms
. Changing the alias
name doesn’t trigger a reindex.
Updating indexes in a production environment¶
Updates in a production environment should be done in two steps, so to not show incomplete data.
Setup a release of your branch using cchq <env> setup_limited_release:keep_days=n_days
In your release directory, kick off a index using
./mange.py ptop_preindex
Verify that the reindex has completed successfully - This is a weak point in our current migration process - This can be done by using ES head or the ES APIs to compare document counts to the previous index. - You should also actively look for errors in the ptop_preindex command that was ran
Merge your PR and deploy your latest master branch.
How to un-bork your broken indexes¶
Sometimes things get in a weird state and (locally!) it’s easiest to just blow away the index and start over.
Delete the affected index. The easiest way to do this is with elasticsearch-head. You can delete multiple affected indices with
curl -X DELETE http://localhost:9200/*
.*
can be replaced with any regex to delete matched indices, similar to bash regex.Run
$ ./manage.py ptop_preindex && ./manage.py ptop_es_manage --flip_all_aliases
.Try again
Querying Elasticsearch - Best Practices¶
Here are the most basic things to know if you want to write readable and reasonably performant code for accessing Elasticsearch.
Use ESQuery when possible¶
Check out Querying Elasticsearch
Prefer the cleaner
.count()
,.values()
,.values_list()
, etc. execution methods to the more low level.run().hits
,.run().total
, etc. With the latter easier to make mistakes and fall into anti-patterns and it’s harder to read.Prefer adding filter methods to using
set_query()
unless you really know what you’re doing and are willing to make your code more error prone and difficult to read.
Prefer “get” to “search”¶
Don’t use search to fetch a doc or doc fields by doc id; use “get” instead. Searching by id can be easily an order of magnitude (10x) slower. If done in a loop, this can effectively grind the ES cluster to a halt.
Bad::
POST /hqcases_2016-03-04/case/_search
{
"query": {
"filtered": {
"filter": {
"and": [{"terms": {"_id": [case_id]}}, {"match_all": {}}]
},
"query": {"match_all":{}}
}
},
"_source": ["name"],
"size":1000000
}
Good::
GET /hqcases_2016-03-04/case/<case_id>?_source_include=name
Prefer scroll queries¶
Use a scroll query when fetching lots of records.
Prefer filter to query¶
Don’t use query
when you could use filter
if you don’t need rank.
Use size(0) with aggregations¶
Use size(0)
when you’re only doing aggregations thing—otherwise you’ll
get back doc bodies as well! Sometimes that’s just abstractly wasteful, but often
it can be a serious performance hit for the operation as well as the cluster.
The best way to do this is by using helpers like ESQuery’s .count()
that know to do this for you—your code will look better and you won’t have to remember
to check for that every time. (If you ever find helpers not doing this correctly,
then it’s definitely worth fixing.)
Elasticsearch App¶
Adapter Design¶
The HQ Elastic adapter design came about due to the need for reindexing Elasticsearch indexes in a way that is transparent to parts of HQ that write to Elasticsearch (e.g. pillowtop). Reindexing is necessary for making changes to index mappings, is a prerequisite to upgrading an Elasticsearch cluster, and is also needed for changing low-level index configurations (e.g. sharding).
There is an existing procedure draft that documents the steps that were used on
one occasion to reindex the case_search
index. This procedure leveraged a
custom pillow to “backfill” the cloned index (i.e. initially populated using
Elasticsearch Reindex API). That procedure only works for a subset of HQ
Elasticsearch indexes, and is too risky to be considered as an ongoing Elastic
maintenance strategy. There are several key constraints that an HQ reindexing
procedure should meet which the existing procedure does not:
simple and robust
performed with standard maintenance practices
provides the ability to test and verify the integrity of a new index before it is too late to be rejected
allows HQ Elasticsearch index state to remain decoupled from the commcare-cloud codebase
is not disruptive – does not prohibit any other kind of standard maintenance that might come up while the operation is underway
is “fire and forget” – does not require active polling of intermediate state in order to progress the overall operation
is practical for third party HQ hosters to use
One way to accomplish these constraints is to implement an “index multiplexing” feature in HQ, where Elasticsearch write operations are duplicated across two indexes. This design facilitates maintaining two up-to-date versions of any index (a primary read/write index and a secondary write-only index), allowing HQ to run in a “normal” state (i.e. not a custom “maintenance” state) while providing the ability to switch back and forth (swapping primary and secondary) before fully committing to abandoning one of them. Creating a copy of an index is the unavoidable nature of a reindex operation, and multiplexing allows safe switching from one to the other without causing disruptions or outages while keeping both up-to-date.
The least disruptive way to accomplish a multiplexing design is with an adapter
layer that operates between the low-level third party Elasticsearch Python
client library and high-level HQ components which need to read/write data in an
Elasticsearch index. HQ already has the initial framework for this layer (the
ElasticsearchInterface
class), so the adapter layer is not a new concept.
The reason that the ElasticsearchInterface
implementation cannot be modified
in-place to accommodate multiplexing is because it is the wrong level of
abstraction. The ElasticsearchInterface
abstraction layer was designed as an
Elasticsearch version abstraction. It provides a common set of functions and
methods so that the high-level HQ “consumer” that uses it can interact with
Elasticsearch documents without knowing which Elasticsearch version is on the
backend. It is below “index-level” logic, and does not implement index-specific
functionality needed in order for some indexes to be handled differently than
others (e.g. some indexes are indexed individually while others are
multiplexed). The document adapter implementation is a document abstraction
layer. It provides a common set of functions and methods to allow high-level HQ
code to perform Elasticsearch operations at the document level, allowing unique
adapters to handle their document operations differently from index to index.
With a multiplexing adapter layer, reindexing an Elasticsearch index can be as few as four concise steps, none of which are time-critical in respect to each other:
Merge and deploy a PR that configures multiplexing on an index.
Execute an idempotent management command that updates the secondary index from its primary counterpart.
Merge and deploy a PR that disables multiplexing for the index, (now using only the new index).
Execute a management command to delete the old index.
Note: the above steps are not limited to a single index at a time. That is, the implementation does not prohibit configuring multiplexing and reindexing multiple indexes at once.
This reindex procedure is inherently safe because:
At any point in the process, the rollback procedure is a simple code change (i.e. revert PR, deploy).
The operation responsible for populating the secondary index is idempotent and decoupled from the index configuration, allowing it to undergo change iterations without aborting the entire process (thereby losing reindex progress).
Instructions for third party hosters can follow the same process that Dimagi uses, which guarantees that any possible problems encountered by a third party hoster are not outside the Dimagi main track.
Design Details¶
Reindex Procedure Details¶
Configure multiplexing on an index.
Configure the document adapter for the index with a “secondary index name”. This will cause the adapter to use multiplexing logic instead of a single index.
Note: The multiplexing logic required for this operation is not yet implemented. The multiplexing adapter will most likely delegate to two document adapters configured for separate indexes. Suffice it to say that when a secondary index is defined for an adapter, it effectively becomes a multiplexing adapter (which to the consumer, is indistinguishable from a “standard” adapter).
(Optional) If the reindex involves other meta-index changes (shards, mappings, etc), also update those configurations at this time.
Add a migration which performs all cluster-level operations required for the new (secondary) index. For example:
creates the new index
configures shards, replicas, etc for the index
sets the index mapping
Review, merge and deploy this change. At Django startup, the new (secondary) index will automatically and immediately begin receiving document writes. Document reads will always come from the primary index.
Execute a management command to sync and verify the secondary index from the primary.
Note: This command is not yet implemented.
This management command is idempotent and performs four operations in serial. If any of the operations complete with unexpected results, the command will abort with an error.
Executes a Elastic
reindex
request with parameters to populate the secondary index from the primary, configured to not overwrite existing documents in the target (secondary) index.Polls the reindex task progress, blocking until complete.
Note: the reindex API also supports a “blocking” mode which may be advantageous due to limitations in Elasticsearch 2.4’s Task API. As such, this step 2. might be removed in favor of a blocking reindex during the 2.4 –> 5.x upgrade.
Performs a cleanup operation on the secondary index to remove tombstone documents.
Performs a verification pass to check integrity of the secondary index.
Note: An exact verification algorithm has not been designed, and complex verification operations may be left out of the first implementation. The reason it is outlined in this design is to identify that verification is supported and would happen at this point in the process. The initial implementation will at least implement feature-equivalency with the previous process (i.e. ensure document counts are equal between the two indexes), and tentatively an “equivalency check” of document
_id
’s (tentative because checking equality while the multiplexer is running is a race condition).
Example command (not yet implemented):
./manage.py elastic_sync_multiplexed ElasticBook
Perform a primary/secondary “swap” operation one or more times as desired to run a “live test” on the new (secondary) index while keeping the old (primary) index up-to-date.
Reconfigure the adapter by swapping the “primary” and “secondary” index names.
Add a migration that cleans up tombstone documents on the “new primary” index prior to startup.
Note: In theory, this step can be optional (e.g. if the sync procedure becomes sufficiently trusted in the future, or for “goldilox” indexes where rebuilding from source is feasible but advantageous to avoid, etc).
Disable multiplexing for the index.
Reconfigure the document adapter for the index by changing the “primary index name” to the value of the “secondary index name” and remove the secondary configuration (thus reverting the adapter back to a single-index adapter).
Add a migration that cleans up tombstone documents on the index.
Review, merge and deploy this change.
Execute a management command to delete the old index. Example:
./manage.py prune_elastic_index ElasticBook
Elastic Client Adapters¶
The corehq.apps.es.client
module encapsulates the CommCare HQ Elasticsearch
client adapters. It implements a high-level Elasticsearch client protocol
necessary to accomplish all interactions with the backend Elasticsearch cluster.
Client adapters are split into two usage patterns, the “Management Adapter” and
“Document Adapters”.
Management Adapter¶
There is only one management adapter, ElasticManageAdapter
. This adapter is
used for performing all cluster management tasks such as creating and updating
indices and their mappings, changing index settings, changing cluster settings,
etc. This functionality is split into a separate class for a few reasons:
The management adapter is responsible for low-level Elastic operations which document adapters should never be performing because the scope of a document adapter does not extend beyond a single index.
Elasticsearch 5+ implements security features which limit the kinds of operations a connection can be used for. The separation in these client adapter classes is designed to fit into that model.
The management adapter does not need any special parameters to work with, and can be instantiated and used directly:
adapter = ElasticManageAdapter()
adapter.index_create("books")
mapping = {"properties": {
"author": {"type": "text"},
"title": {"type": "text"},
"published": {"type": "date"},
}}
adapter.index_put_mapping("books", "book", mapping)
adapter.index_refresh("books")
adapter.index_delete("books")
Document Adapters¶
Document adapters are created on a per-index basis and include specific
properties and functionality necessary for maintaining a single type of “model”
document in a single index. Each index in Elasticsearch needs to have a
cooresponding ElasticDocumentAdapter
subclass which defines how the Python
model is applied to that specific index. At the very least, a document adapter
must define the following:
An
_index_name
attribute whose value is the name of the Elastic index used by the adapter. This attribute must be private to support proper index naming between production code and tests.A
type
attribute whose value is the name is the Elastic_type
for documents used by the adapter.A
mapping
which defines the structure and properties for documents managed by the adapter.A
from_python()
classmethod which can convert a Python model object into the JSON-serializable format for writing into the adapter’s index.
The combination of (index_name, type)
constrains the document adapter to
a specific HQ document mapping. Comparing an Elastic cluster to a Postgres
database (for the sake of analogy), the Elastic index is analogous to a
Postgres schema object (e.g. public
), and the _type
property is
analogous to a Postgres table object. The combination of both index name
and _type
fully constrains the properties that make up a specific Elastic
document.
A simple example of a document model and its cooresponding adapter:
class Book:
def __init__(self, isbn, author, title, published):
self.isbn = isbn
self.author = author
self.title = title
self.published = published
class ElasticBook(ElasticDocumentAdapter):
_index_name = "books"
type = "book"
mapping = {"properties": {
"author": {"type": "text"},
"title": {"type": "text"},
"published": {"type": "date"},
}}
@classmethod
def from_python(cls, book):
source = {
"author": book.author,
"title": book.title,
"published": book.published,
}
return book.isbn, source
Using this adapter in practice might look as follows:
adapter = ElasticBook()
# index new
new_book = Book(
"978-1491946008",
"Luciano Ramalho",
"Fluent Python: Clear, Concise, and Effective Programming",
datetime.date(2015, 2, 10),
)
adapter.index(new_book)
# fetch existing
classic_book = adapter.fetch("978-0345391803")
Code Documentation¶
HQ Elasticsearch client logic (adapters).
- class corehq.apps.es.client.BaseAdapter(for_export=False)[source]¶
Base adapter that includes methods common to all adapters.
- class corehq.apps.es.client.BulkActionItem(op_type, doc=None, doc_id=None)[source]¶
A wrapper for documents to be processed via Elasticsearch’s Bulk API. Collections of these objects can be passed to an ElasticDocumentAdapter’s
.bulk()
method for processing.Instances of this class are meant to be acquired via one of the factory methods rather than instantiating directly (via
__init__()
).- class OpType(value)¶
An enumeration.
- classmethod delete_id(doc_id)[source]¶
Factory method for a document delete action providing only the ID
- property is_delete¶
True
if this is a delete action, otherwiseFalse
.
- property is_index¶
True
if this is an index action, otherwiseFalse
.
- class corehq.apps.es.client.ElasticDocumentAdapter(for_export=False)[source]¶
Base for subclassing document-specific adapters.
Subclasses must define the following:
_index_name
: class attribute (str
)type
: class attribute (str
)mapping
: class attribute (dict
)from_python(...)
: classmethod for converting models into Elastic format
- bulk(actions, refresh=False, **kw)[source]¶
Use the Elasticsearch library’s
bulk()
helper function to process documents en masse.Equivalent to the legacy
ElasticsearchInterface.bulk_ops(...)
method.- Parameters
actions – iterable of
BulkActionItem
instancesrefresh –
bool
refresh the effected shards to make this operation visible to search**kw –
extra parameters passed directly to the underlying
elasticsearch.helpers.bulk()
function.
- bulk_delete(doc_ids, refresh=False, **kw)[source]¶
Convenience method for bulk deleting many documents by ID without the BulkActionItem boilerplate.
- Parameters
doc_ids – iterable of document IDs to be deleted
refresh –
bool
refresh the effected shards to make this operation visible to search**kw –
extra parameters passed directly to the underlying
elasticsearch.helpers.bulk()
function.
- bulk_index(docs, refresh=False, **kw)[source]¶
Convenience method for bulk indexing many documents without the BulkActionItem boilerplate.
- Parameters
docs – iterable of (Python model) documents to be indexed
refresh –
bool
refresh the effected shards to make this operation visible to search**kw –
extra parameters passed directly to the underlying
elasticsearch.helpers.bulk()
function.
- count(query)[source]¶
Return the number of documents matched by the
query
- Parameters
query –
dict
query body- Returns
int
- delete(doc_id, refresh=False)[source]¶
Delete an existing document from Elasticsearch
Equivalent to the legacy
ElasticsearchInterface.delete_doc(...)
method.- Parameters
doc_id –
str
ID of the document to deleterefresh –
bool
refresh the effected shards to make this operation visible to search
- exists(doc_id)[source]¶
Check if a document exists for the provided
doc_id
Equivalent to the legacy
ElasticsearchInterface.doc_exists(...)
method.- Parameters
doc_id –
str
ID of the document to be checked- Returns
bool
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- get(doc_id, source_includes=[])[source]¶
Return the document for the provided
doc_id
Equivalent to the legacy
ElasticsearchInterface.get_doc(...)
method.- Parameters
doc_id –
str
ID of the document to be fetchedsource_includes – a list of fields to extract and return
- Returns
dict
- get_docs(doc_ids)[source]¶
Return multiple docs for the provided
doc_ids
Equivalent to the legacy
ElasticsearchInterface.get_bulk_docs(...)
method.- Parameters
doc_ids – iterable of document IDs (
str
’s)- Returns
dict
- index(doc, refresh=False, **kw)[source]¶
Index (send) a new document in (to) Elasticsearch
Equivalent to the legacy
ElasticsearchInterface.index_doc(...)
method.- Parameters
doc – the (Python model) document to index
refresh –
bool
refresh the effected shards to make this operation visible to search**kw –
extra parameters passed directly to the underlying
elasticsearch.Elasticsearch.index()
method.
- iter_docs(doc_ids, chunk_size=100)[source]¶
Return a generator which fetches documents in chunks.
- Parameters
doc_ids – iterable of document IDs (``str``s)
chunk_size –
int
number of documents to fetch per query
- Yields
dict
documents
- scroll(query, **kw)[source]¶
Perfrom a scrolling search, yielding each doc until the entire context is exhausted.
- Parameters
query –
dict
raw search query.**kw –
Additional scroll keyword arguments. Valid options:
size
:int
scroll size (number of documents per “scroll” page)scroll
:str
time value specifying how long the Elastic cluster should keep the search context alive.
- Yields
dict
documents
- search(query, **kw)[source]¶
Perform a query (search) and return the result.
- Parameters
query –
dict
search query to execute**kw –
extra parameters passed directly to the underlying
elasticsearch.Elasticsearch.search()
method.
- Returns
dict
- classmethod to_json(doc)[source]¶
Convenience method that returns the full “from python” document (including the
_id
key, if present) as it would be returned by an adaptersearch
result.This method is not used by the adapter itself, and is only present for other code which wishes to work with documents in a couch-like format.
- Parameters
doc – document (instance of a Python model)
- update(doc_id, fields, refresh=False, **kw)[source]¶
Update an existing document in Elasticsearch
Equivalent to the legacy
ElasticsearchInterface.update_doc_fields(...)
method.- Parameters
doc_id –
str
ID of the document to updatefields –
dict
of fields/values to update on the existing Elastic docrefresh –
bool
refresh the effected shards to make this operation visible to search**kw –
extra parameters passed directly to the underlying
elasticsearch.Elasticsearch.update()
method.
- class corehq.apps.es.client.ElasticManageAdapter[source]¶
-
- cluster_routing(*, enabled)[source]¶
Enable or disable cluster routing.
- Parameters
enabled –
bool
whether to enable or disable routing
- get_aliases()[source]¶
Return the cluster aliases information.
- Returns
dict
with format{<alias>: [<index>, ...], ...}
- get_indices(full_info=False)[source]¶
Return the cluster index information.
- Parameters
full_info –
bool
whether to return the full index info (defaultFalse
)- Returns
dict
- get_node_info(node_id, metric)[source]¶
Return a specific metric from the node info for an Elasticsearch node.
- Parameters
node_id –
str
ID of the nodemetric –
str
name of the metric to fetch
- Returns
deserialized JSON (
dict
,list
,str
, etc)
- get_task(task_id)[source]¶
Return the details for an active task
- Parameters
task_id –
str
ID of the task- Returns
dict
of task details- Raises
TaskError
orTaskMissing
(subclass ofTaskError
)
- index_configure_for_reindex(index)[source]¶
Update an index with settings optimized for reindexing.
- Parameters
index –
str
index for which to change the settings
- index_configure_for_standard_ops(index)[source]¶
Update an index with settings optimized standard HQ performance.
- Parameters
index –
str
index for which to change the settings
- index_create(index, settings=None)[source]¶
Create a new index.
- Parameters
index –
str
index namesettings –
dict
of index settings
- index_exists(index)[source]¶
Check if
index
refers to a valid index identifier (index name or alias).- Parameters
name –
str
index name or alias- Returns
bool
- index_put_alias(index, name)[source]¶
Assign an alias to an existing index. This uses the
Elasticsearch.update_aliases()
method to perform both ‘remove’ and ‘add’ actions simultaneously, which is atomic on the server-side. This ensures that the alias is only assigned to one index at a time, and that (if present) an existing alias does not vanish momentarily.See: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html
- Parameters
index –
str
name of the index to be aliasedname –
str
name of the alias to assign toindex
- index_put_mapping(index, type_, mapping)[source]¶
Update the mapping for a doc type on an index.
- Parameters
index –
str
index where the mapping should be updatedtype –
str
doc type to update on the indexmapping –
dict
mapping for the provided doc type
Querying Elasticsearch¶
ESQuery¶
ESQuery is a library for building elasticsearch queries in a friendly, more readable manner.
Basic usage¶
There should be a file and subclass of ESQuery for each index we have.
Each method returns a new object, so you can chain calls together like SQLAlchemy. Here’s an example usage:
q = (FormsES()
.domain(self.domain)
.xmlns(self.xmlns)
.submitted(gte=self.datespan.startdate_param,
lt=self.datespan.enddateparam)
.source(['xmlns', 'domain', 'app_id'])
.sort('received_on', desc=False)
.size(self.pagination.count)
.start(self.pagination.start)
.terms_aggregation('babies.count', 'babies_saved'))
result = q.run()
total_docs = result.total
hits = result.hits
Generally useful filters and queries should be abstracted away for re-use, but you can always add your own like so:
q.filter({"some_arbitrary_filter": {...}})
q.set_query({"fancy_query": {...}})
For debugging or more helpful error messages, you can use query.dumps()
and query.pprint()
, both of which use json.dumps()
and are suitable for
pasting in to ES Head or Marvel or whatever
Filtering¶
Filters are implemented as standalone functions, so they can be composed and
nested q.OR(web_users(), mobile_users())
.
Filters can be passed to the query.filter
method: q.filter(web_users())
There is some syntactic sugar that lets you skip this boilerplate and just
call the filter as if it were a method on the query class: q.web_users()
In order to be available for this shorthand, filters are added to the
builtin_filters
property of the main query class.
I know that’s a bit confusing, but it seemed like the best way to make filters
available in both contexts.
Generic filters applicable to all indices are available in
corehq.apps.es.filters
. (But most/all can also be accessed as a query
method, if appropriate)
Filtering Specific Indices¶
There is a file for each elasticsearch index (if not, feel free to add one). This file provides filters specific to that index, as well as an appropriately-directed ESQuery subclass with references to these filters.
These index-specific query classes also have default filters to exclude things
like inactive users or deleted docs.
These things should nearly always be excluded, but if necessary, you can remove
these with remove_default_filters
.
Language¶
es_query - the entire query, filters, query, pagination
filters - a list of the individual filters
query - the query, used for searching, not filtering
field - a field on the document. User docs have a ‘domain’ field.
lt/gt - less/greater than
lte/gte - less/greater than or equal to
- class corehq.apps.es.es_query.ESQuery(index=None, for_export=False)[source]¶
This query builder only outputs the following query structure:
{ "query": { "bool": { "filter": { "and": [ <filters> ] }, "query": <query> } }, <size, sort, other params> }
- property builtin_filters¶
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- fields(fields)[source]¶
Restrict the fields returned from elasticsearch
Deprecated. Use source instead.
- filter(filter)[source]¶
Add the passed-in filter to the query. All filtering goes through this class.
- property filters¶
Return a list of the filters used in this query, suitable if you want to reproduce a query with additional filtering.
- get_ids()[source]¶
Performs a minimal query to get the ids of the matching documents
For very large sets of IDs, use
scroll_ids
instead
- nested_sort(path, field_name, nested_filter, desc=False, reset_sort=True)[source]¶
Order results by the value of a nested field
- scroll()[source]¶
Run the query against the scroll api. Returns an iterator yielding each document that matches the query.
- search_string_query(search_string, default_fields=None)[source]¶
Accepts a user-defined search string
- set_query(query)[source]¶
Set the query. Most stuff we want is better done with filters, but if you actually want Levenshtein distance or prefix querying…
- set_sorting_block(sorting_block)[source]¶
To be used with get_sorting_block, which interprets datatables sorting
- class corehq.apps.es.es_query.ESQuerySet(raw, query)[source]¶
- The object returned from
ESQuery.run
ESQuerySet.raw is the raw response from elasticsearch
ESQuerySet.query is the ESQuery object
- property doc_ids¶
Return just the docs ids from the response.
- property hits¶
Return the docs from the response.
- property total¶
Return the total number of docs matching the query.
- The object returned from
- class corehq.apps.es.es_query.HQESQuery(index=None, for_export=False)[source]¶
Query logic specific to CommCareHQ
- property builtin_filters¶
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
Available Filters¶
The following filters are available on any ESQuery instance - you can chain any of these on your query.
Note also that the term
filter accepts either a list or a single element.
Simple filters which match against a field are based on this filter, so those
will also accept lists.
That means you can do form_query.xmlns(XMLNS1)
or
form_query.xmlns([XMLNS1, XMLNS2, ...])
.
Contributing:
Additions to this file should be added to the builtin_filters
method on
either ESQuery or HQESQuery, as appropriate (is it an HQ thing?).
- corehq.apps.es.filters.date_range(field, gt=None, gte=None, lt=None, lte=None)[source]¶
Range filter that accepts date and datetime objects as arguments
- corehq.apps.es.filters.empty(field)[source]¶
Only return docs with a missing or null value for
field
- corehq.apps.es.filters.nested(path, filter_)[source]¶
Query nested documents which normally can’t be queried directly
- corehq.apps.es.filters.non_null(field)[source]¶
Only return docs with a real, non-null value for
field
Available Queries¶
Queries are used for actual searching - things like relevancy scores, Levenstein distance, and partial matches.
View the elasticsearch documentation to see what other options are available, and put ‘em here if you end up using any of ‘em.
- corehq.apps.es.queries.filtered(query, filter_)[source]¶
Filtered query for performing both filtering and querying at once
- corehq.apps.es.queries.geo_distance(field, geopoint, **kwargs)[source]¶
Filters cases to those within a certain distance of the provided geopoint
eg: geo_distance(‘gps_location’, GeoPoint(-33.1, 151.8), kilometers=100)
- corehq.apps.es.queries.nested(path, query, *args, **kwargs)[source]¶
Creates a nested query for use with nested documents
Keyword arguments such as score_mode and others can be added.
- corehq.apps.es.queries.nested_filter(path, filter_, *args, **kwargs)[source]¶
Creates a nested query for use with nested documents
Keyword arguments such as score_mode and others can be added.
- corehq.apps.es.queries.search_string_query(search_string, default_fields=None)[source]¶
Allows users to use advanced query syntax, but if
search_string
does not use the ES query string syntax, default to doing an infix search for each term. (This may later change to some kind of fuzzy matching).This is also available via the main ESQuery class.
Aggregate Queries¶
Aggregations are a replacement for Facets
Here is an example used to calculate how many new pregnancy cases each user has opened in a certain date range.
res = (CaseES()
.domain(self.domain)
.case_type('pregnancy')
.date_range('opened_on', gte=startdate, lte=enddate))
.aggregation(TermsAggregation('by_user', 'opened_by')
.size(0)
buckets = res.aggregations.by_user.buckets
buckets.user1.doc_count
There’s a bit of magic happening here - you can access the raw json data from
this aggregation via res.aggregation('by_user')
if you’d prefer to skip it.
The res
object has a aggregations
property, which returns a namedtuple
pointing to the wrapped aggregation results. The name provided at instantiation is
used here (by_user
in this example).
The wrapped aggregation_result
object has a result
property containing the
aggregation data, as well as utilties for parsing that data into something more
useful. For example, the TermsAggregation
result also has a counts_by_bucket
method that returns a {bucket: count}
dictionary, which is normally what you
want.
As of this writing, there’s not much else developed, but it’s pretty easy to add support for other aggregation types and more results processing
- class corehq.apps.es.aggregations.AggregationRange(start=None, end=None, key=None)[source]¶
Note that a range includes the “start” value and excludes the “end” value. i.e. start <= X < end
- Parameters
start – range start
end – range end
key – optional key name for the range
- class corehq.apps.es.aggregations.AggregationTerm(name, field)¶
- field¶
Alias for field number 1
- name¶
Alias for field number 0
- class corehq.apps.es.aggregations.DateHistogram(name, datefield, interval, timezone=None)[source]¶
Aggregate by date range. This can answer questions like “how many forms were created each day?”.
This class can be instantiated by the
ESQuery.date_histogram
method.- Parameters
name – what do you want to call this aggregation
datefield – the document’s date field to look at
interval – the date interval to use: “year”, “quarter”, “month”, “week”, “day”, “hour”, “minute”, “second”
timezone – do bucketing using this time zone instead of UTC
- class corehq.apps.es.aggregations.ExtendedStatsAggregation(name, field, script=None)[source]¶
Extended stats aggregation that computes an extended stats aggregation by field
- class corehq.apps.es.aggregations.FilterAggregation(name, filter)[source]¶
Bucket aggregation that creates a single bucket for the specified filter
- Parameters
name – aggregation name
filter – filter body
- class corehq.apps.es.aggregations.FiltersAggregation(name, filters=None)[source]¶
Bucket aggregation that creates a bucket for each filter specified using the filter name.
- Parameters
name – aggregation name
- class corehq.apps.es.aggregations.MinAggregation(name, field)[source]¶
Bucket aggregation that returns the minumum value of a field
- Parameters
name – aggregation name
field – name of the field to min
- class corehq.apps.es.aggregations.MissingAggregation(name, field)[source]¶
A field data based single bucket aggregation, that creates a bucket of all documents in the current document set context that are missing a field value (effectively, missing a field or having the configured NULL value set).
- Parameters
name – aggregation name
field – name of the field to bucket on
- class corehq.apps.es.aggregations.NestedAggregation(name, path)[source]¶
A special single bucket aggregation that enables aggregating nested documents.
- Parameters
path – Path to nested document
- class corehq.apps.es.aggregations.NestedTermAggregationsHelper(base_query, terms)[source]¶
Helper to run nested term-based queries (equivalent to SQL group-by clauses). This is not at all related to the ES ‘nested aggregation’. The final aggregation is a count of documents.
Example usage:
# counting all forms submitted in a domain grouped by app id and user id NestedTermAggregationsHelper( base_query=FormES().domain(domain_name), terms=[ AggregationTerm('app_id', 'app_id'), AggregationTerm('user_id', 'form.meta.userID'), ] ).get_data()
This works by bucketing docs first by one terms aggregation, then within that bucket, bucketing further by the next term, and so on. This is then flattened out to appear like a group-by-multiple.
- class corehq.apps.es.aggregations.RangeAggregation(name, field, ranges=None, keyed=True)[source]¶
Bucket aggregation that creates one bucket for each range :param name: the aggregation name :param field: the field to perform the range aggregations on :param ranges: list of AggregationRange objects :param keyed: set to True to have the results returned by key instead of as a list (see RangeResult.normalized_buckets)
- class corehq.apps.es.aggregations.StatsAggregation(name, field, script=None)[source]¶
Stats aggregation that computes a stats aggregation by field
- Parameters
name – aggregation name
field – name of the field to collect stats on
script – an optional field to allow you to script the computed field
- class corehq.apps.es.aggregations.SumAggregation(name, field)[source]¶
Bucket aggregation that sums a field
- Parameters
name – aggregation name
field – name of the field to sum
- class corehq.apps.es.aggregations.TermsAggregation(name, field, size=None, missing=None)[source]¶
Bucket aggregation that aggregates by field
- Parameters
name – aggregation name
field – name of the field to bucket on
size –
missing – define how documents that are missing a value should be treated. By default, they will be ignored. If a value is supplied here it will be used where the value is missing.
- class corehq.apps.es.aggregations.TopHitsAggregation(name, field=None, is_ascending=True, size=1, include=None)[source]¶
A top_hits metric aggregator keeps track of the most relevant document being aggregated This aggregator is intended to be used as a sub aggregator, so that the top matching documents can be aggregated per bucket.
- Parameters
name – Aggregation name
field – This is the field to sort the top hits by. If None, defaults to sorting by score.
is_ascending – Whether to sort the hits in ascending or descending order.
size – The number of hits to include. Defaults to 1.
include – An array of fields to include in the hit. Defaults to returning the whole document.
AppES¶
- class corehq.apps.es.apps.AppES(index=None, for_export=False)[source]¶
- property builtin_filters¶
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- index = 'apps'¶
- class corehq.apps.es.apps.ElasticApp(for_export=False)[source]¶
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- property mapping¶
- type = 'app'¶
UserES¶
Here’s an example adapted from the case list report - it gets a list of the ids of all unknown users, web users, and demo users on a domain.
from corehq.apps.es import users as user_es
user_filters = [
user_es.unknown_users(),
user_es.web_users(),
user_es.demo_users(),
]
query = (user_es.UserES()
.domain(self.domain)
.OR(*user_filters)
.show_inactive())
owner_ids = query.get_ids()
- class corehq.apps.es.users.ElasticUser(for_export=False)[source]¶
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- property mapping¶
- type = 'user'¶
- class corehq.apps.es.users.UserES(index=None, for_export=False)[source]¶
- property builtin_filters¶
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- default_filters = {'active': {'term': {'is_active': True}}, 'not_deleted': {'term': {'base_doc': 'couchuser'}}}¶
- index = 'users'¶
- corehq.apps.es.users.admin_users()[source]¶
Return only AdminUsers. Admin users are mock users created from xform submissions with unknown user ids whose username is “admin”.
CaseES¶
Here’s an example getting pregnancy cases that are either still open or were closed after May 1st.
from corehq.apps.es import cases as case_es
q = (case_es.CaseES()
.domain('testproject')
.case_type('pregnancy')
.OR(case_es.is_closed(False),
case_es.closed_range(gte=datetime.date(2015, 05, 01))))
- class corehq.apps.es.cases.CaseES(index=None, for_export=False)[source]¶
- property builtin_filters¶
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- index = 'cases'¶
- class corehq.apps.es.cases.ElasticCase(for_export=False)[source]¶
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- property mapping¶
- type = 'case'¶
- class corehq.apps.es.cases.ElasticReportCase(for_export=False)[source]¶
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- property mapping¶
- type = 'report_case'¶
- corehq.apps.es.cases.active_in_range(gt=None, gte=None, lt=None, lte=None)[source]¶
Restricts cases returned to those with actions during the range
FormES¶
- class corehq.apps.es.forms.ElasticForm(for_export=False)[source]¶
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- property mapping¶
- type = 'xform'¶
- class corehq.apps.es.forms.ElasticReportForm(for_export=False)[source]¶
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- property mapping¶
- type = 'report_xform'¶
- class corehq.apps.es.forms.FormES(index=None, for_export=False)[source]¶
- property builtin_filters¶
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- default_filters = {'has_domain': {'exists': {'field': 'domain'}}, 'has_user': {'exists': {'field': 'form.meta.userID'}}, 'has_xmlns': {'exists': {'field': 'xmlns'}}, 'is_xform_instance': {'term': {'doc_type': 'xforminstance'}}}¶
- index = 'forms'¶
DomainES¶
Here’s an example generating a histogram of domain creations (that’s a type of faceted query), filtered by a provided list of domains and a report date range.
from corehq.apps.es import DomainES
domains_after_date = (DomainES()
.in_domains(domains)
.created(gte=datespan.startdate, lte=datespan.enddate)
.date_histogram('date', 'date_created', interval)
.size(0))
histo_data = domains_after_date.run().aggregations.date.buckets_list
- class corehq.apps.es.domains.DomainES(index=None, for_export=False)[source]¶
- property builtin_filters¶
A list of callables that return filters. These will all be available as instance methods, so you can do
self.term(field, value)
instead ofself.filter(filters.term(field, value))
- default_filters = {'not_snapshot': {'bool': {'must_not': {'term': {'is_snapshot': True}}}}}¶
- index = 'domains'¶
- class corehq.apps.es.domains.ElasticDomain(for_export=False)[source]¶
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- property mapping¶
- type = 'hqdomain'¶
SMSES¶
- class corehq.apps.es.sms.ElasticSMS(for_export=False)[source]¶
- classmethod from_python(doc)[source]¶
Transform a Python model object into the json-serializable (
dict
) format suitable for indexing in Elasticsearch.- Parameters
doc – document (instance of a Python model)
- Returns
tuple
of(doc_id, source_dict)
suitable for being indexed/updated/deleted in Elasticsearch
- property mapping¶
- type = 'sms'¶