Pillows

Overview

What are pillows

Pillows are a component of the publisher/subscriber design pattern that is used for asynchronous communication.

A pillow subscribes to a change feed, and when changes are received, performs specific operations related to that change.

Why do we need pillows

In CommCare HQ, pillows are primarily used to update secondary databases like Elasticsearch and User Configurable Reports (UCRs). Examples of other use cases are invalidating cache or checking if alerts need to be sent.

How do pillows receive changes

We use Kafka as our message queue, which allows producers to publish changes to the queue, and consumers (i.e. pillows) to listen for and process those changes.

Kafka uses _topics_ to organize related changes, and pillows can listen for changes to one or more specific topics.

Why the name

Pillows, as part of the pillowtop framework, were created by us to consume and process changes from the CouchDB change feed. Our usage of pillows has since expanded beyond CouchDB.

Deconstructing a Pillow

All pillows inherit from the ConstructedPillow class. A pillow consists of a few parts:

  1. Change Feed

  2. Checkpoint

  3. Processor(s)

  4. Change Event Handler

Change Feed

The brief overview is that a change feed publishes changes which a pillow can subscribe to. When setting up a pillow, an instance of a ChangeFeed class is created and configured to only contain changes the pillow cares about.

For more information about change feeds, see Change Feeds.

Checkpoint

The checkpoint is a json field that tells processor where to start the change feed.

Processors

A processor is a method that operates on the incoming change. Historically, we had one processor per pillow, however we have since shifted to favor multiple processors for each pillow. This way, all processors can operate on the change which ensures all operations relevant for a change happen within relatively the same time window.

When creating a processor you should be aware of how much time it will take to process the record. A useful baseline is:

86400 seconds per day / # of expected changes per day = how long your processor should take

Note that it should be faster than this as most changes will come in at once instead of evenly distributed throughout the day.

Change Event Handler

This fires after each change has been processed. The main use case is to save the checkpoint to the database.

Error Handling

Errors

Pillows can fail to process a change for a number of reasons. The most common causes of pillow errors are a code bug, or a failure in a dependent service (e.g., attempting to save a change to Elasticsearch but it is unreachable).

Errors encountered in processors are handled by creating an instance of the PillowError database model.

Retries

The run_pillow_retry_queue command is configured to run continuously in a celery queue, and looks for new PillowError objects to retry. A pillow has the option to disable retrying errors via the retry_errors property.

If the related pillow reads from a Kafka change feed, the change associated with the error is re-published into Kafka. However if it reads from a Couch change feed, the pillow’s processor is called directly with the change passed in. In both cases, the PillowError is deleted, a new one will be created if it fails again.

Monitoring

There are several datadog metrics with the prefix commcare.change_feed that can be helpful for monitoring pillows. Generally these metrics will have tags for pillow name, topic, and partition to filter on.

Metric (not including commcare.change_feed)

Description

change_lag

The current time - when the last change processed was put into the queue

changes.count

Number of changes processed

changes.success

Number of changes processed successfully

changes.exceptions

Number of changes processed with an exception

processor.timing

Time spent in processing a document. Different tags for extract/transform/load steps.

processed_offsets

Latest offset that has been processed by the pillow

current_offsets

The current offsets of each partition in kafka (useful for math in dashboards)

need_processing

current_offsets - processed_offsets

Generally when planning for pillows, you should:
  • Minimize change_lag
    • ensures changes are processed in a reasonable time (e.g., up to date reports for users)

  • Minimize changes.exceptions
    • ensures consistency across application (e.g., secondary databases contain accurate data)

    • more exceptions mean more load since they will be reprocessed at a later time

  • Minimize number of pillows running
    • minimizes server resources required

The ideal setup would have 1 pillow with no exceptions and 0 second lag.

Troubleshooting

A pillow is falling behind

Otherwise known as “pillow lag”, a pillow can fall behind for a few reasons:

  1. The processor is too slow for the number of changes that are coming in.

  2. There was an issue with the change feed that caused the checkpoint to be “rewound”.

  3. A processor continues to fail so changes are re-queued and processed again later.

Lag is inherent to asynchronous change processing, so the question is what amount of lag is acceptable for users.

Optimizing a processor

To solve #1 you should use any monitors that have been set up to attempt to pinpoint the issue. commcare.change_feed.processor.timing can help determine what processors/pillows are the root cause of slow processing.

If this is a UCR pillow use the profile_data_source management command to profile the expensive data sources.

Parallel Processors

To scale pillows horizontally do the following:

  1. Look for what pillows are behind. This can be found in the change feed dashboard or the hq admin system info page.

  2. Ensure you have enough resources on the pillow server to scale the pillows. This can be found through datadog.

  3. Decide what topics need to have added partitions in kafka. There is no way to scale a couch pillow horizontally. Removing partitions isn’t straightforward, so you should attempt scaling in small increments. Also make sure pillows are able to split partitions easily by using powers of 2.

  4. Run ./manage.py add_kafka_partition <topic> <number partitions to have>

  5. In the commcare-cloud repo environments/<env>/app-processes.yml file change num_processes to the pillows you want to scale.

  6. On the next deploy multiple processes will be used when starting pillows

Note that pillows will automatically divide up partitions based on the number of partitions and the number of processes for the pillow. It doesn’t have to be one to one, and you don’t have to specify the mapping manually. That means you can create more partitions than you need without changing the number of pillow processes and just restart pillows for the change to take effect. Later you can just change the number of processes without touching the number of partitions, and and just update the supervisor conf and restarting pillows for the change to take effect.

The UCR pillows also have options to split the pillow into multiple. They include ucr_divsion, include_ucrs and exclude_ucrs. Look to the pillow code for more information on these.

Rewound Checkpoint

Occasionally checkpoints will be “rewound” to a previous state causing pillows to process changes that have already been processed. This usually happens when a couch node fails over to another. If this occurs, stop the pillow, wait for confirmation that the couch nodes are up, and fix the checkpoint using: ./manage.py fix_checkpoint_after_rewind <pillow_name>

Many pillow exceptions

commcare.change_feed.changes.exceptions has tag exception_type that reports the name and path of the exception encountered. These exceptions could be from coding errors or from infrastructure issues. If they are from infrastructure issues (e.g. ES timeouts) some solutions could be:

  • Scale ES cluster (more nodes, shards, etc)

  • Reduce number of pillow processes that are writing to ES

  • Reduce other usages of ES if possible (e.g. if some custom code relies on ES, could it use UCRs, https://github.com/dimagi/commcare-hq/pull/26241)

Problem with checkpoint for pillow name: First available topic offset for topic is num1 but needed num2

This happens when the earliest checkpoint that kafka knows about for a topic is after the checkpoint the pillow wants to start at. This often happens if a pillow has been stopped for a month and has not been removed from the settings.

To fix this you should verify that the pillow is no longer needed in the environment. If it isn’t, you can delete the checkpoint and re-deploy. This should eventually be followed up by removing the pillow from the settings.

If the pillow is needed and should be running you’re in a bit of a pickle. This means that the pillow is not able to get the required document ids from kafka. It also won’t be clear what documents the pillows has and has not processed. To fix this the safest thing will be to force the pillow to go through all relevant docs. Once this process is started you can move the checkpoint for that pillow to the most recent offset for its topic.

Pillows

corehq.pillows.case.get_case_pillow(pillow_id='case-pillow', ucr_division=None, include_ucrs=None, exclude_ucrs=None, num_processes=1, process_num=0, ucr_configs=None, skip_ucr=False, processor_chunk_size=10, topics=None, dedicated_migration_process=False, **kwargs)[source]

Return a pillow that processes cases. The processors include, UCR and elastic processors

Processors:
corehq.pillows.xform.get_xform_pillow(pillow_id='xform-pillow', ucr_division=None, include_ucrs=None, exclude_ucrs=None, num_processes=1, process_num=0, ucr_configs=None, skip_ucr=False, processor_chunk_size=10, topics=None, dedicated_migration_process=False, **kwargs)[source]

Generic XForm change processor

Processors:
corehq.pillows.case.get_case_to_elasticsearch_pillow(pillow_id='CaseToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

Return a pillow that processes cases to Elasticsearch.

Processors:
corehq.pillows.xform.get_xform_to_elasticsearch_pillow(pillow_id='XFormToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

XForm change processor that sends form data to Elasticsearch

Processors:
corehq.pillows.user.get_user_pillow(pillow_id='user-pillow', num_processes=1, dedicated_migration_process=False, process_num=0, skip_ucr=False, processor_chunk_size=10, **kwargs)[source]

Processes users and sends them to ES and UCRs.

Processors:
corehq.pillows.user.get_user_pillow_old(pillow_id='UserPillow', num_processes=1, process_num=0, **kwargs)[source]

Processes users and sends them to ES.

Processors:
corehq.apps.userreports.pillow.get_location_pillow(pillow_id='location-ucr-pillow', include_ucrs=None, num_processes=1, process_num=0, ucr_configs=None, **kwargs)[source]

Processes updates to locations for UCR

Note this is only applicable if a domain on the environment has LOCATIONS_IN_UCR flag enabled.

Processors:
corehq.pillows.groups_to_user.get_group_pillow(pillow_id='group-pillow', num_processes=1, process_num=0, **kwargs)[source]

Group pillow

Processors:
corehq.pillows.group.get_group_pillow_old(pillow_id='GroupPillow', num_processes=1, process_num=0, **kwargs)[source]

Group pillow (old). Sends Group data to Elasticsearch

Processors:
corehq.pillows.groups_to_user.get_group_to_user_pillow(pillow_id='GroupToUserPillow', num_processes=1, process_num=0, **kwargs)[source]

Group pillow that updates user data in Elasticsearch with group membership

Processors:
corehq.pillows.ledger.get_ledger_to_elasticsearch_pillow(pillow_id='LedgerToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

Ledger pillow

Note that this pillow’s id references Elasticsearch, but it no longer saves to ES. It has been kept to keep the checkpoint consistent, and can be changed at any time.

Processors:
corehq.pillows.domain.get_domain_kafka_to_elasticsearch_pillow(pillow_id='KafkaDomainPillow', num_processes=1, process_num=0, **kwargs)[source]

Domain pillow to replicate documents to ES

Processors:
corehq.pillows.sms.get_sql_sms_pillow(pillow_id='SqlSMSPillow', num_processes=1, process_num=0, processor_chunk_size=10, **kwargs)[source]

SMS Pillow

Processors:
corehq.apps.userreports.pillow.get_kafka_ucr_pillow(pillow_id='kafka-ucr-main', ucr_division=None, include_ucrs=None, exclude_ucrs=None, topics=None, num_processes=1, process_num=0, dedicated_migration_process=False, processor_chunk_size=10, **kwargs)[source]

UCR pillow that reads from all Kafka topics and writes data into the UCR database tables.

Processors:
corehq.apps.userreports.pillow.get_kafka_ucr_static_pillow(pillow_id='kafka-ucr-static', ucr_division=None, include_ucrs=None, exclude_ucrs=None, topics=None, num_processes=1, process_num=0, dedicated_migration_process=False, processor_chunk_size=10, **kwargs)[source]

UCR pillow that reads from all Kafka topics and writes data into the UCR database tables.

Only processes static UCR datasources (configuration lives in the codebase instead of the database).

corehq.pillows.synclog.get_user_sync_history_pillow(pillow_id='UpdateUserSyncHistoryPillow', num_processes=1, process_num=0, **kwargs)[source]

Synclog pillow

Processors:
corehq.pillows.application.get_app_to_elasticsearch_pillow(pillow_id='ApplicationToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

App pillow

Processors:
corehq.pillows.app_submission_tracker.get_form_submission_metadata_tracker_pillow(pillow_id='FormSubmissionMetadataTrackerPillow', num_processes=1, process_num=0, **kwargs)[source]

This gets a pillow which iterates through all forms and marks the corresponding app as having submissions.

corehq.pillows.user.get_unknown_users_pillow(pillow_id='unknown-users-pillow', num_processes=1, process_num=0, **kwargs)[source]

This pillow adds users from xform submissions that come in to the User Index if they don’t exist in HQ

Processors:
corehq.messaging.pillow.get_case_messaging_sync_pillow(pillow_id='case_messaging_sync_pillow', topics=None, num_processes=1, process_num=0, processor_chunk_size=10, **kwargs)[source]

Pillow for synchronizing messaging data with case data.

Processors:
corehq.pillows.case_search.get_case_search_to_elasticsearch_pillow(pillow_id='CaseSearchToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

Populates the case search Elasticsearch index.

Processors:
  • corehq.pillows.case_search.CaseSearchPillowProcessor

corehq.pillows.cacheinvalidate._get_cache_invalidation_pillow(pillow_id, couch_db, couch_filter=None)[source]

Pillow that listens to changes and invalidates the cache whether it’s a single doc being cached or a view.

Processors:
  • corehq.pillows.cache_invalidate_pillow.CacheInvalidateProcessor

corehq.apps.change_feed.pillow.get_change_feed_pillow_for_db(pillow_id, couch_db, default_topic=None)[source]

Generic pillow for inserting Couch documents into Kafka.

Reads from:
  • CouchDB

Writes to:
  • Kafka

Processors

class corehq.pillows.user.UnknownUsersProcessor[source]

Monitors forms for user_ids we don’t know about and creates an entry in ES for the user.

Reads from:
  • Kafka topics: form-sql, form

  • XForm data source

Writes to:
  • UserES index

class corehq.apps.change_feed.pillow.KafkaProcessor(data_source_type, data_source_name, default_topic)[source]

Generic processor for CouchDB changes to put those changes in a kafka topic

Reads from:
  • CouchDB change feed

Writes to:
  • Specified kafka topic

  • DeletedCouchDoc SQL table

class corehq.pillows.groups_to_user.GroupsToUsersProcessor[source]

When a group changes, this updates the user doc in UserES

Reads from:
  • Kafka topics: group

  • Group data source (CouchDB)

Writes to:
  • UserES index

corehq.pillows.group.get_group_to_elasticsearch_processor()[source]

Inserts group changes into ES

Reads from:
  • Kafka topics: group

  • Group data source (CouchDB)

Writes to:
  • GroupES index

class corehq.pillows.ledger.LedgerProcessor[source]

Updates ledger section and entry combinations (exports), daily consumption and case location ids

Reads from:
  • Kafka topics: ledger

  • Ledger data source

Writes to:
  • LedgerSectionEntry postgres table

  • Ledger data source

class corehq.pillows.cacheinvalidate.CacheInvalidateProcessor[source]

Invalidates cached CouchDB documents

Reads from:
  • CouchDB

Writes to:
  • Redis

class corehq.pillows.synclog.UserSyncHistoryProcessor[source]

Updates the user document with reporting metadata when a user syncs

Note when USER_REPORTING_METADATA_BATCH_ENABLED is True that this is written to a postgres table. Entries in that table are then batched and processed separately.

Reads from:
  • CouchDB (user)

  • SynclogSQL table

Writes to:
  • CouchDB (user) (when batch processing disabled) (default)

  • UserReportingMetadataStaging (SQL) (when batch processing enabled)

class pillowtop.processors.form.FormSubmissionMetadataTrackerProcessor[source]

Updates the user document with reporting metadata when a user submits a form

Also marks the application as having submissions.

Note when USER_REPORTING_METADATA_BATCH_ENABLED is True that this is written to a postgres table. Entries in that table are then batched and processed separately

Reads from:
  • CouchDB (user and app)

  • XForm data source

Writes to:
  • CouchDB (app)

  • CouchDB (user) (when batch processing disabled) (default)

  • UserReportingMetadataStaging (SQL) (when batch processing enabled)

class corehq.apps.userreports.pillow.ConfigurableReportPillowProcessor(table_manager)[source]

Generic processor for UCR.

Reads from:
  • SQLLocation

  • Form data source

  • Case data source

Writes to:
  • UCR database

class pillowtop.processors.elastic.ElasticProcessor(adapter, doc_filter_fn=None, change_filter_fn=None)[source]

Generic processor to transform documents and insert into ES.

Processes one document at a time.

Reads from:
  • Usually Couch

  • Sometimes SQL

Writes to:
  • ES

class pillowtop.processors.elastic.BulkElasticProcessor(adapter, doc_filter_fn=None, change_filter_fn=None)[source]

Generic processor to transform documents and insert into ES.

Processes one “chunk” of changes at a time (chunk size specified by pillow).

Reads from:
  • Usually Couch

  • Sometimes SQL

Writes to:
  • ES

corehq.pillows.case_search.get_case_search_processor()[source]

Case Search

Reads from:
  • Case data source

Writes to:
  • Case Search ES index

class corehq.messaging.pillow.CaseMessagingSyncProcessor[source]
Reads from:
  • Case data source

  • Update Rules

Writes to:
  • PhoneNumber

  • Runs rules for SMS (can be many different things)