Pillows

What they are

A pillow is a subscriber to a change feed. When a change is published the pillow receives the document, performs some calculation or transform, and publishes it to another database.

Creating a pillow

All pillows inherit from ConstructedPillow class. A pillow consists of a few parts:

  1. Change Feed

  2. Checkpoint

  3. Processor(s)

  4. Change Event Handler

Change Feed

Change feeds are documented in the Changes Feed section available on the left.

The 10,000 foot view is a change feed publishes changes which you can subscribe to.

Checkpoint

The checkpoint is a json field that tells processor where to start the change feed.

Processor(s)

A processor is what handles the transformation or calculation and publishes it to a database. Most pillows only have one processor, but sometimes it will make sense to combine processors into one pillow when you are only iterating over a small number of documents (such as custom reports).

When creating a processor you should be aware of how much time it will take to process the record. A useful baseline is:

86400 seconds per day / # of expected changes per day = how long your processor should take

Note that it should be faster than this as most changes will come in at once instead of evenly distributed throughout the day.

Change Event Handler

This fires after each change has been processed. The main use case is to save the checkpoint to the database.

Error Handling

Pillow errors are handled by saving to model PillowError. A celery queue reads from this model and retries any errors on the pillow.

Monitoring

There are several datadog metrics with the prefix commcare.change_feed that can be helpful for monitoring pillows. Generally these metrics will have tags for pillow name, topic and partition to filter on

Metric (not including commcare.change_feed)

Description

change_lag

The current time - when the last change processed was put into the queue

changes.count

Number of changes processed

changes.success

Number of changes processed successfully

changes.exceptions

Number of changes processed with an exception

processor.timing

Time spent in processing a document. Different tags for extract/transform/load steps.

processed_offsets

Latest offset that has been processed by the pillow

current_offsets

The current offsets of each partition in kafka (useful for math in dashboards)

need_processing

current_offsets - processed_offsets

Generally when planning for pillows, you should:
  • Minimize change_lag
    • for up to date reports for users

  • Minimize changes.exceptions
    • for consistency between primary and reporting databases

    • because exceptions mean that they must be reprocessed at a later time (effectively adding more load and lag later)

  • Minimize number of pillows running
    • for fewer server resources needed

The ideal setup would have 1 pillow with no exceptions and 0 second lag.

Troubleshooting

A pillow is falling behind

A pillow can fall behind for two reasons:

  1. The processor is too slow for the number of changes that are coming in. (i.e. change_lag for that pillow is very high)

  2. There has been an issue with the change feed that has caused the checkpoint to be “rewound”

  3. Many exceptions happen during the day which requires pillows to process the same changes later.

Optimizing a processor

To solve #1 you should use any monitors that have been set up to attempt to pinpoint the issue. commcare.change_feed.processor.timing can help determine what processors/pillows are the root cause of slow processing.

If this is a UCR pillow use the profile_data_source management command to profile the expensive data sources.

Parallel Processors

To scale pillows horizontally do the following:

  1. Look for what pillows are behind. This can be found in the change feed dashboard or the hq admin system info page.

  2. Ensure you have enough resources on the pillow server to scale the pillows This can be found through datadog.

  3. Decide what topics need to have added partitions in kafka. There is no way to scale a couch pillow horizontally. You can also not remove partitions so you should attempt scaling in small increments. Also attempt to make sure pillows are able to split partitions easily. It’s easiest to use powers of 2

  4. Run ./manage.py add_kafka_partition <topic> <number partitions to have>

  5. In the commcare-cloud repo environments/<env>/app-processes.yml file change num_processes to the pillows you want to scale.

  6. On the next deploy multiple processes will be used when starting pillows

Note that pillows will automatically divide up partitions based on the number of partitions and the number of processes for the pillow. It doesn’t have to be one to one, and you don’t have to specify the mapping manually. That means you can create more partitions than you need without changing the number of pillow processes and just restart pillows for the change to take effect. Later you can just change the number of processes without touching the number of partitions, and and just update the supervisor conf and restarting pillows for the change to take effect.

The UCR pillows also have options to split the pillow into multiple. They include ucr_divsion, include_ucrs and exclude_ucrs. Look to the pillow code for more information on these.

Rewound Checkpoint

Occasionally checkpoints will be “rewound” to a previous state causing pillows to process changes that have already been processed. This usually happens when a couch node fails over to another. If this occurs, stop the pillow, wait for confirmation that the couch nodes are up, and fix the checkpoint using: ./manage.py fix_checkpoint_after_rewind <pillow_name>

Many pillow exceptions

commcare.change_feed.changes.exceptions has tag exception_type that reports the name and path of the exception encountered. These exceptions could be from coding errors or from infrastructure issues. If they are from infrastructure issues (e.g. ES timeouts) some solutions could be:

  • Scale ES cluster (more nodes, shards, etc)

  • Reduce number of pillow processes that are writing to ES

  • Reduce other usages of ES if possible (e.g. if some custom code relies on ES, could it use UCRs, https://github.com/dimagi/commcare-hq/pull/26241)

Problem with checkpoint for pillow name: First available topic offset for topic is num1 but needed num2

This happens when the earliest checkpoint that kafka knows about for a topic is after the checkpoint the pillow wants to start at. This often happens if a pillow has been stopped for a month and has not been removed from the settings.

To fix this you should verify that the pillow is no longer needed in the environment. If it isn’t, you can delete the checkpoint and re-deploy. This should eventually be followed up by removing the pillow from the settings.

If the pillow is needed and should be running you’re in a bit of a pickle. This means that the pillow is not able to get the required document ids from kafka. It also won’t be clear what documents the pillows has and has not processed. To fix this the safest thing will be to force the pillow to go through all relevant docs. Once this process is started you can move the checkpoint for that pillow to the most recent offset for its topic.

Pillows

corehq.pillows.case.get_case_pillow(pillow_id='case-pillow', ucr_division=None, include_ucrs=None, exclude_ucrs=None, num_processes=1, process_num=0, ucr_configs=None, skip_ucr=False, processor_chunk_size=10, topics=None, **kwargs)[source]

Return a pillow that processes cases. The processors include, UCR and elastic processors

Processors:
corehq.pillows.xform.get_xform_pillow(pillow_id='xform-pillow', ucr_division=None, include_ucrs=None, exclude_ucrs=None, num_processes=1, process_num=0, ucr_configs=None, skip_ucr=False, processor_chunk_size=10, topics=None, **kwargs)[source]

Generic XForm change processor

Processors:
corehq.pillows.case.get_case_to_elasticsearch_pillow(pillow_id='CaseToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

Return a pillow that processes cases to Elasticsearch.

Processors:
corehq.pillows.xform.get_xform_to_elasticsearch_pillow(pillow_id='XFormToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

XForm change processor that sends form data to Elasticsearch

Processors:
corehq.pillows.user.get_user_pillow(pillow_id='user-pillow', num_processes=1, process_num=0, skip_ucr=False, processor_chunk_size=10, **kwargs)[source]

Processes users and sends them to ES and UCRs.

Processors:
corehq.pillows.user.get_user_pillow_old(pillow_id='UserPillow', num_processes=1, process_num=0, **kwargs)[source]

Processes users and sends them to ES.

Processors:
corehq.apps.userreports.pillow.get_location_pillow(pillow_id='location-ucr-pillow', include_ucrs=None, num_processes=1, process_num=0, ucr_configs=None, **kwargs)[source]

Processes updates to locations for UCR

Note this is only applicable if a domain on the environment has LOCATIONS_IN_UCR flag enabled.

Processors:
corehq.pillows.groups_to_user.get_group_pillow(pillow_id='group-pillow', num_processes=1, process_num=0, **kwargs)[source]

Group pillow

Processors:
corehq.pillows.group.get_group_pillow_old(pillow_id='GroupPillow', num_processes=1, process_num=0, **kwargs)[source]

Group pillow (old). Sends Group data to Elasticsearch

Processors:
corehq.pillows.groups_to_user.get_group_to_user_pillow(pillow_id='GroupToUserPillow', num_processes=1, process_num=0, **kwargs)[source]

Group pillow that updates user data in Elasticsearch with group membership

Processors:
corehq.pillows.ledger.get_ledger_to_elasticsearch_pillow(pillow_id='LedgerToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

Ledger pillow

Note that this pillow’s id references Elasticsearch, but it no longer saves to ES. It has been kept to keep the checkpoint consistent, and can be changed at any time.

Processors:
corehq.pillows.domain.get_domain_kafka_to_elasticsearch_pillow(pillow_id='KafkaDomainPillow', num_processes=1, process_num=0, **kwargs)[source]

Domain pillow to replicate documents to ES

Processors:
corehq.pillows.sms.get_sql_sms_pillow(pillow_id='SqlSMSPillow', num_processes=1, process_num=0, processor_chunk_size=10, **kwargs)[source]

SMS Pillow

Processors:
corehq.apps.userreports.pillow.get_kafka_ucr_pillow(pillow_id='kafka-ucr-main', ucr_division=None, include_ucrs=None, exclude_ucrs=None, topics=None, num_processes=1, process_num=0, processor_chunk_size=10, **kwargs)[source]

UCR pillow that reads from all Kafka topics and writes data into the UCR database tables.

Processors:
corehq.apps.userreports.pillow.get_kafka_ucr_static_pillow(pillow_id='kafka-ucr-static', ucr_division=None, include_ucrs=None, exclude_ucrs=None, topics=None, num_processes=1, process_num=0, processor_chunk_size=10, **kwargs)[source]

UCR pillow that reads from all Kafka topics and writes data into the UCR database tables.

Only processes static UCR datasources (configuration lives in the codebase instead of the database).

corehq.pillows.synclog.get_user_sync_history_pillow(pillow_id='UpdateUserSyncHistoryPillow', num_processes=1, process_num=0, **kwargs)[source]

Synclog pillow

Processors:
corehq.pillows.application.get_app_to_elasticsearch_pillow(pillow_id='ApplicationToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

App pillow

Processors:
corehq.pillows.app_submission_tracker.get_form_submission_metadata_tracker_pillow(pillow_id='FormSubmissionMetadataTrackerPillow', num_processes=1, process_num=0, **kwargs)[source]

This gets a pillow which iterates through all forms and marks the corresponding app as having submissions.

corehq.pillows.user.get_unknown_users_pillow(pillow_id='unknown-users-pillow', num_processes=1, process_num=0, **kwargs)[source]

This pillow adds users from xform submissions that come in to the User Index if they don’t exist in HQ

Processors:
corehq.messaging.pillow.get_case_messaging_sync_pillow(pillow_id='case_messaging_sync_pillow', topics=None, num_processes=1, process_num=0, processor_chunk_size=10, **kwargs)[source]

Pillow for synchronizing messaging data with case data.

Processors:
corehq.pillows.case_search.get_case_search_to_elasticsearch_pillow(pillow_id='CaseSearchToElasticsearchPillow', num_processes=1, process_num=0, **kwargs)[source]

Populates the case search Elasticsearch index.

Processors:
  • corehq.pillows.case_search.CaseSearchPillowProcessor

corehq.pillows.cacheinvalidate._get_cache_invalidation_pillow(pillow_id, couch_db, couch_filter=None)[source]

Pillow that listens to changes and invalidates the cache whether it’s a single doc being cached or a view.

Processors:
  • corehq.pillows.cache_invalidate_pillow.CacheInvalidateProcessor

corehq.apps.change_feed.pillow.get_change_feed_pillow_for_db(pillow_id, couch_db, default_topic=None)[source]

Generic pillow for inserting Couch documents into Kafka.

Reads from:
  • CouchDB

Writes to:
  • Kafka

Processors

class corehq.pillows.user.UnknownUsersProcessor[source]

Monitors forms for user_ids we don’t know about and creates an entry in ES for the user.

Reads from:
  • Kafka topics: form-sql, form

  • XForm data source

Writes to:
  • UserES index

class corehq.apps.change_feed.pillow.KafkaProcessor(data_source_type, data_source_name, default_topic)[source]

Generic processor for CouchDB changes to put those changes in a kafka topic

Reads from:
  • CouchDB change feed

Writes to:
  • Specified kafka topic

class corehq.pillows.groups_to_user.GroupsToUsersProcessor[source]

When a group changes, this updates the user doc in UserES

Reads from:
  • Kafka topics: group

  • Group data source (CouchDB)

Writes to:
  • UserES index

corehq.pillows.group.get_group_to_elasticsearch_processor()[source]

Inserts group changes into ES

Reads from:
  • Kafka topics: group

  • Group data source (CouchDB)

Writes to:
  • GroupES index

class corehq.pillows.ledger.LedgerProcessor[source]

Updates ledger section and entry combinations (exports), daily consumption and case location ids

Reads from:
  • Kafka topics: ledger

  • Ledger data source

Writes to:
  • LedgerSectionEntry postgres table

  • Ledger data source

class corehq.pillows.cacheinvalidate.CacheInvalidateProcessor[source]

Invalidates cached CouchDB documents

Reads from:
  • CouchDB

Writes to:
  • Redis

class corehq.pillows.synclog.UserSyncHistoryProcessor[source]

Updates the user document with reporting metadata when a user syncs

Note when USER_REPORTING_METADATA_BATCH_ENABLED is True that this is written to a postgres table. Entries in that table are then batched and processed separately.

Reads from:
  • CouchDB (user)

  • SynclogSQL table

Writes to:
  • CouchDB (user) (when batch processing disabled) (default)

  • UserReportingMetadataStaging (SQL) (when batch processing enabled)

class pillowtop.processors.form.FormSubmissionMetadataTrackerProcessor[source]

Updates the user document with reporting metadata when a user submits a form

Also marks the application as having submissions.

Note when USER_REPORTING_METADATA_BATCH_ENABLED is True that this is written to a postgres table. Entries in that table are then batched and processed separately

Reads from:
  • CouchDB (user and app)

  • XForm data source

Writes to:
  • CouchDB (app)

  • CouchDB (user) (when batch processing disabled) (default)

  • UserReportingMetadataStaging (SQL) (when batch processing enabled)

class corehq.apps.userreports.pillow.ConfigurableReportPillowProcessor(data_source_providers, ucr_division=None, include_ucrs=None, exclude_ucrs=None, bootstrap_interval=10800, run_migrations=True)[source]

Generic processor for UCR.

Reads from:
  • SQLLocation

  • Form data source

  • Case data source

Writes to:
  • UCR database

class pillowtop.processors.elastic.ElasticProcessor(elasticsearch, index_info, doc_prep_fn=None, doc_filter_fn=None)[source]

Generic processor to transform documents and insert into ES.

Processes one document at a time.

Reads from:
  • Usually Couch

  • Sometimes SQL

Writes to:
  • ES

class pillowtop.processors.elastic.BulkElasticProcessor(elasticsearch, index_info, doc_prep_fn=None, doc_filter_fn=None)[source]

Generic processor to transform documents and insert into ES.

Processes one “chunk” of changes at a time (chunk size specified by pillow).

Reads from:
  • Usually Couch

  • Sometimes SQL

Writes to:
  • ES

corehq.pillows.case_search.get_case_search_processor()[source]

Case Search

Reads from:
  • Case data source

Writes to:
  • Case Search ES index

class corehq.messaging.pillow.CaseMessagingSyncProcessor[source]
Reads from:
  • Case data source

  • Update Rules

Writes to:
  • PhoneNumber

  • Runs rules for SMS (can be many different things)