Skip to content

Ingest Pipeline

Joshua Essex edited this page Oct 8, 2019 · 2 revisions

Ingest Pipeline

Regardless of ingest channel or protocol, our ingest pipeline transforms raw incoming data into our common schema and persists it into the database for downstream processing and analysis. The pipeline has some key design characteristics, meant to keep jurisdiction onboarding time low and platform extensibility high.

  • Configuration-driven - new source data systems do not necessarily require new code to navigate the organizational structure or map fields from their system-specific schema to our common schema. Instead, incoming raw data is provided alongside flat yaml configuration to our data extractors, which automatically reorganize entities and map fields thereof.
  • General-purpose - the ingest pipeline consists of a series of functional steps used for all pipelines. Each step has a straight-forward interface and one to a few implementations to allow for some variation.
  • Override-capable - different stages in the pipeline have hooks to allow for jurisdiction-specific overrides. We try to limit the number of overrides and have a bias towards generalizing functionality, within reason. But there are a few steps where overrides are an absolute necessity.

Pipeline Structure

Each step in the pipeline has a functional interface, taking in some input type and returning an output type to be passed downstream to the next function. Each function is stateless, and might take in additional inputs beyond the main item of transformation. Functions tend to operate on collections of objects that correspond to the ingested entity graph rooted at a particular person, e.g. a collection of ingest_info objects where each object contains a pre-normalized entity graph, or a collection of person objects where each person includes some graph of attached child entities.

The pipeline is run single-threaded but parallelized across multiple processes, where each package of source data is fully run through pipeline steps in succession. For example, a flat file provided through direct ingest is parsed and passed through the full pipeline in one thread, but each flat file provided in a nightly upload from the jurisdiction is given its own task for execution. In scrape-based ingest, the response from an outbound scraping request will be passed through the full pipeline in one thread, but each such request is given its own task for execution.

The input, output, and functionality of each step is defined below.

Data Extraction

Read more here.

  • Inputs: raw source data as a string of content and an extraction yaml configuration file
  • Outputs: an IngestInfo entity graph
  • Functionality: The data extractor converts raw content into an organized IngestInfo entity graph, re-organizing incoming entities into our own schema but not yet normalizing individual fields.
  • Notable implementations: CsvDataExtractor, HtmlDataExtractor, JsonDataExtractor
  • Other notes: A data extractor can optionally take in an existing IngestInfo and organize incoming data into that entity graph. Additionally, data extractors are one of the few stateful steps as they track information useful in keeping track of progress made earlier in the content.
  • Jurisdiction overrides?: Yes, both in the sense that the yaml configuration input is jurisdiction-specific and with optional pre- and post-processing hooks specific to a given region

Graph Validation

  • Inputs: an IngestInfo entity graph
  • Outputs: None, raises an error if there are validation errors
  • Functionality: This validates that the entity graph in the ingest info object adheres to basic schematic assumptions. The exact set of validation rules will expand over time.
  • Jurisdiction overrides?: No

Normalization

Read more here.

  • Inputs: an IngestInfo entity graph and an IngestMetadata context object
  • Outputs: a list of person objects hydrated with attached children from the ingested graph
  • Functionality: The ingest info converter does a couple of key things: 1) restructures the more consolidated ingest_info graph structure into a set of distinct entities which reference each other, and 2) normalizes all fields in all ingested entities. Normalization includes the coding of raw string text into allowable enumerated values (for enum types) and the conversion of other fields into their appropriate types. This may also involve inferring the values of certain fields based on the values of other fields.
  • Notable implementations: CountyConverter, StateConverter (for handling the structure of the county- and state-level schemas). Also many type-specific helper functions that convert a specific entity class.
  • Other notes: The output object also includes any errors that occurred during the normalization process, broken down into a few key error types (enum parsing, general parsing, protected class errors). These errors are checked after this step to determine whether to continue further in the pipeline.
  • Jurisdiction overrides?: No

Entity Validation

  • Inputs: a list of person objects hydrated with attached children from the ingested graph
  • Outputs: a list of the person objects successfully validated, and the number of failed validations
  • Functionality: This operates on the level of each root person object, looking at each person and its child entities all at once, to check that specific entities meet certain assumptions. The exact set of validation rules will expand over time.
  • Notable implementations: validate_county_person, validate_state_person
  • Jurisdiction overrides?: No

Entity Matching

  • Inputs: a list of person objects hydrated with attached children from the ingested graph
  • Outputs: a list of person objects with fully hydrated entity graphs based on ingested state and previously written state
  • Functionality: The entity matcher does a few important things: 1) matches ingested persons to database persons, 2) matches non-person entities to non-person database entities, 3) matches/updates relationships among entities in the updated entity graph. Matching of non-person entities is based on provided external ids and cosine similarity, while matching of person entities is based on provided external ids and personally identifiable information. Relationship matching is a bit more complex and deals with the fact that new entities and entity updates will come in unpredictable orders over time, and how these entities may thus change in various ways.
  • Notable implementations: county_entity_matcher, state_entity_matcher
  • Other notes: The output object also includes any errors that occurred during the matching process, as well as any entities that were found to be "orphaned," i.e. after the matching process, the entity no longer has a direct chain of ancestor entities back to a particular person, though it may have a person reference anyway. These error counts are used to determine whether to continue with persistence to the database.
  • Jurisdiction overrides?: Yes, optional pre- and post-processing hooks specific to a given region

Persistence

  • Inputs: a list of person objects with fully hydrated entity graphs based on ingested state and previously written state
  • Outputs: whether or not persistence was successful
  • Functionality: After entity matching, we now have a full entity graph, rooted around a person, that we are ready to upsert into the database. Among the person and its child entities, we determine which entities are brand new and which already exist -- new entities are inserted and their historical snapshots are first opened (specifying valid_from and leaving valid_to unspecified); existing entities are updated, their existing historical snapshots are closed (specifying valid_to), and new historical snapshots are opened.
  • Notable implementations: persistence, batch_persistence, historical_snapshot_updater
  • Jurisdiction overrides?: No