Skip to content

Batch Persistence

Joshua Essex edited this page Oct 8, 2019 · 1 revision

Batch Persistence

One of our primary goals is to have useable data in the database. We therefore want to make sure that our error rate is not too high before considering a new ingest pipeline's worth of writes for a region as valid. For this purpose, writes can optionally be batched, and when the pipeline is complete we check its total ingest successes and failures before committing to persist the results to the database.

Batching individual tasks

In scrape-based ingest, we batch writes by default across all scraped regions. In non-batched persistence for scrapers, each individual scrape task that generates a new entity graph to be persisted immediately writes it to the database and terminates. In batched persistence, however, these tasks write a BatchIngestInfoData object to a Pub/Sub topic: in the event of successful ingest, the object has the ingested entity graph as an IngestInfo object; in the event of an error, the object has some error specified.

In direct ingest, we do not batch writes by default, since we naturally consolidate all writes into a single transaction for a single ingested file or request. However, batch persistence can be optionally enabled for any ingest process. The examples below are described within the context of scrape-based ingest.

BatchIngestInfoData

The fields in BatchIngestInfoData are:

  1. task_hash - the hash of the task which created this object
  2. ingest_info - the ingested entity graph, if successful
  3. error - an error message, if failure
  4. trace_id - the trace id of the failure request, if failure

Writing errors

If the task fails, the failure is caught and a BatchIngestInfoData is published with the task_hash, error, and trace_id. Note that if a task fails, it still returns a 500 error and retries some number of times. Each retry incurs a failed write to the Pub/Sub, so these are de-duplicated using the task_hash.

Writing Successes

If the task passes, the BatchIngestInfoData is published with the task_hash and ingest_info. Note that we still publish the task because Pub/Sub does not have exactly-once delivery semantics, so this is used to de-duplicate successes, as well.

Writing to the database

Once the ingest pipeline is complete and all BatchIngestInfoData messages are published to Pub/Sub, the batch persistence endpoint is requested, indicating which region should be written via query parameter. The batch persistence flow goes through the following steps:

  1. Read all messages for the given region from Pub/Sub
  2. For every message, check if we have already seen a success from that task: if so we discard it because if a task has a single success we can ignore failures (a task can fail twice and pass the third time, for example). This also protects against multiple successful messages being published.
  3. From these messages we convert and merge each message's ingest_info object into a single entity graph to be persisted, and only pass it down through the rest of the persistence layer if a sufficient number of objects passed conversion successfully.
  4. Similarly, the persistence layer counts errors on each of the individual people in the merged entity graph, and fails the write if too many failed, namely in either conversion to the ORM models or in entity matching.

Multi-threaded pulls from Pub/Sub

Pub/Sub has trouble reading all messages off of a topic in a timely fashion. The method we use is to spawn 5 threads that all pull from the topic at the same time and return. We continuously spawn 5 threads and keep reading, stopping if and only if all 5 threads returned no messages.