Skip to content

Direct Ingest

Joshua Essex edited this page Oct 8, 2019 · 3 revisions

Direct Ingest

Direct ingest is the most straight-forward ingest channel in our system: a justice agency, government body, or other partner directly sends data into the Recidiviz platform. The orchestration logic for direct ingest is much simpler than scrape-based ingest, for the simple reason that we have control over the gateway through which information passes, whereas in scrape-based ingest we must be ready to parse information in any conceivable page structure.

The information we receive through direct ingest tends to be much richer in detail and time coverage, as it is acquired directly from the source, through partnership with the providing organization. This means that the normalization configuration tends to be more complex, and there tends to be at least a small amount of non-boilerplate logic involved to handle unique quirks in these systems.

General controller structure

All agencies that we directly ingest information from have a Controller implementation which inherits from some base controller for the kind of upload channel we have established with that agency. For example, with some state corrections agencies we receive data as a set of flat files that are uploaded to a secured, dedicated Cloud Storage bucket every night. These agencies have dedicated controllers which extends from the GcsfsDirectIngestController.

The workflow laid out below is written in the context of flat-file based upload, but the same general structure applies to other protocols for direct ingest: trigger an initial task to gather context, launch a task to determine if there is something to ingest right now, launch a task to launch ingest for a particular piece of content (flat file, database dump, API request), and repeat until we are confident we are complete for this agency for now.

The flow for a nightly direct ingest session looks like this:

  1. A cloud function is triggered when an upload of new content is provided, such as the upload of a file into a Cloud Storage bucket. The function gathers metadata about the file and uploading agency and makes a request to the /direct/handle_direct_ingest_file endpoint in the main platform application layer
  2. The handler for this endpoint checks the filename to determine whether or not the file has already been prepared for processing and/or whether or not the file has already been processed
    1. If the file needs to be prepared for processing, a task is launched for that file to augment its filename with timestamp information and a prefix indicating that it has not yet been processed, and if the file is of significant size then it is batched up into smaller files of a configurable size which have their filenames augmented, as well
    2. If the file did not need to be prepared for processing or has now been prepared, an ingest scheduling task is launched
  3. The ingest scheduling task looks at the given file and agency, and checks against configuration for that agency to determine the set of expected files for a given upload session. The scheduler will decide to do one of two things: wait if should not yet start ingest (e.g. it's waiting for another file that we want to be ingested before this one, or another file is currently being ingested) or launch ingest for a particular file (this may not be the file that was just uploaded, if we determine that there's another file we'd rather ingest before this one)
    1. When the scheduler decides to wait, it queues up another scheduler task to repeat the previous step, with some delay to avoid a tight loop of checks
    2. When the scheduler decides to launch ingest for a particular file, it queues up an ingest job task
  4. The ingest job task invokes run_ingest_job function on the base direct ingest controller, which reads the file and performs basic validation (delegating to child implementations), and hands the raw content off to the ingest pipeline with the data extractor configuration for that file and agency
    1. If there is a transient failure during ingest, the ingest job task will be retried automatically
    2. If there is a permanent error, i.e. something is wrong with the content preventing ingest from completing, then we stop direct ingest entirely by signaling an error status up to the orchestration layer. At this point, an engineer will receive an alert and need to intervene
    3. If ingest succeeds for the file, the file name is augmented with a prefix noting that it has been processed, and another ingest scheduler task is launched to determine if there is another file to ingest right now
  5. When there are no more files to ingest for the agency, and we do not expect any more to come soon, we move all of the files noted as processed out of the upload location into an archive location for safe-keeping

Concurrency

Note that we only ingest a single piece of content at a time (such as one flat file), i.e. the concurrency limit for the direct ingest queue for a given agency is 1. This is because our ingest pipeline assumes that the state of a given entity graph in the database is not going to change from one step to another, that we can proceed safely through the entire pipeline. The scale of criminal justice data, even for large agencies with rich information, is not high enough to demand that the ingest pipeline be more concurrency friendly.

Historical to Nightly

New partnerships that result in direct ingest require the up-front exchange of a historical data dump (e.g. the most recent 10 years or more of data in identified systems), with the historical dump provided in the same format that nightly transfers will be sent in. This allows us to build and test the ingest mapping configurations, and to build up a base of data from which to power initial analysis and operations.

The process of getting the historical dump processed in production is an iterative one:

  1. Set up an integration test harness for the direct ingest controller
  2. Working with the agency research or IT staff, come up with a first pass of the ingest mappings for a given file or set of files
    1. Create fixtures with fake data matching the structure of those files, and set up the test cases for those files in the integration tests, iterating until those tests are passing
    2. Run the real files through this test harness locally to see what the results of the ingest pipeline are for those files
    3. If more changes are required, update the mappings (or controller hooks, or platform logic, as necessary), update fixture data to add new variety, and repeat this loop as necessary
  3. When we have confidently run through this loop for all files in the historical data dump, validate assumptions and expected results with agency staff through a joint validation process
    1. If more changes are required, go back to the previous loop and return to this step to validate with staff once more
  4. If we and agency staff sign off on the mappings, run historical ingest through staging to ensure its compatibility with deployed infrastructure
    1. Historical files tend to be very large (hundreds of thousands of rows in the case of CSV files for fundamental entities) and need to be split up. Uploading the large historical file into the staging upload bucket will cause the system to automatically split it up
    2. Historical ingest may take several hours or more for larger agencies, so plan accordingly
  5. If historical ingest proceeds successfully through staging, coordinate with agency staff to determine how to bridge the gap between the previously provided historical data dump and the first nightly transfer
    1. The anecdotally easiest way to do this is to have another historical dump in the same format transferred on the day that nightly ingest is scheduled to begin (we call this a "historical refresh"). If historical ingest takes long enough that it's running when the nightly ingest comes in later, nightly files will wait until historical ingest is finished to begin. By the next day, we should be fully caught up and ready for subsequent nights.
    2. Another possible path is to proceed with the original historical dump and have agency IT staff produce a smaller dump that bridges the gap from the end of the historical dump to the first night. This is ultimately a superset of the complexity from the previous option, but some staff may prefer this route if producing a historical dump is more challenging in their system than producing a more recent, bounded dump.

Authentication

All ingest actions are made through server requests to our main platform application layer in Google App Engine, with options passed as query parameters. All request handlers within the Recidiviz platform require authentication to be callable only from platform services themselves (e.g. a cron job or a scraping task), or from an explicit API request by an authenticated user.

This is trivial to configure by adding the @authenticate_request decorator to request handler functions.