Ingest Pipeline
Regardless of ingest channel or protocol, our ingest pipeline transforms raw incoming data into our common schema and persists it into the database for downstream processing and analysis. The pipeline has some key design characteristics, meant to keep jurisdiction onboarding time low and platform extensibility high.
-
Configuration-driven - new source data systems do not necessarily require new code to navigate the organizational structure or map fields from their system-specific schema to our common schema. Instead, incoming raw data is provided alongside flat
yaml
configuration to our data extractors, which automatically reorganize entities and map fields thereof. - General-purpose - the ingest pipeline consists of a series of functional steps used for all pipelines. Each step has a straight-forward interface and one to a few implementations to allow for some variation.
- Override-capable - different stages in the pipeline have hooks to allow for jurisdiction-specific overrides. We try to limit the number of overrides and have a bias towards generalizing functionality, within reason. But there are a few steps where overrides are an absolute necessity.
Each step in the pipeline has a functional interface, taking in some input type and returning an output type to be passed downstream to the next function. Each function is stateless, and might take in additional inputs beyond the main item of transformation. Functions tend to operate on collections of objects that correspond to the ingested entity graph rooted at a particular person, e.g. a collection of ingest_info
objects where each object contains a pre-normalized entity graph, or a collection of person objects where each person includes some graph of attached child entities.
The pipeline is run single-threaded but parallelized across multiple processes, where each package of source data is fully run through pipeline steps in succession. For example, a flat file provided through direct ingest is parsed and passed through the full pipeline in one thread, but each flat file provided in a nightly upload from the jurisdiction is given its own task for execution. In scrape-based ingest, the response from an outbound scraping request will be passed through the full pipeline in one thread, but each such request is given its own task for execution.
The input, output, and functionality of each step is defined below.
-
Inputs: raw source data as a string of
content
and an extraction yamlconfiguration
file -
Outputs: an
IngestInfo
entity graph -
Functionality: The data extractor converts raw content into an organized
IngestInfo
entity graph, re-organizing incoming entities into our own schema but not yet normalizing individual fields. -
Notable implementations:
CsvDataExtractor
,HtmlDataExtractor
,JsonDataExtractor
-
Other notes: A data extractor can optionally take in an existing
IngestInfo
and organize incoming data into that entity graph. Additionally, data extractors are one of the few stateful steps as they track information useful in keeping track of progress made earlier in the content. - Jurisdiction overrides?: Yes, both in the sense that the yaml configuration input is jurisdiction-specific and with optional pre- and post-processing hooks specific to a given region
-
Inputs: an
IngestInfo
entity graph - Outputs: None, raises an error if there are validation errors
- Functionality: This validates that the entity graph in the ingest info object adheres to basic schematic assumptions. The exact set of validation rules will expand over time.
- Jurisdiction overrides?: No
-
Inputs: an
IngestInfo
entity graph and anIngestMetadata
context object - Outputs: a list of person objects hydrated with attached children from the ingested graph
-
Functionality: The ingest info converter does a couple of key things: 1) restructures the more consolidated
ingest_info
graph structure into a set of distinct entities which reference each other, and 2) normalizes all fields in all ingested entities. Normalization includes the coding of raw string text into allowable enumerated values (for enum types) and the conversion of other fields into their appropriate types. This may also involve inferring the values of certain fields based on the values of other fields. -
Notable implementations:
CountyConverter
,StateConverter
(for handling the structure of the county- and state-level schemas). Also many type-specific helper functions that convert a specific entity class. - Other notes: The output object also includes any errors that occurred during the normalization process, broken down into a few key error types (enum parsing, general parsing, protected class errors). These errors are checked after this step to determine whether to continue further in the pipeline.
- Jurisdiction overrides?: No
- Inputs: a list of person objects hydrated with attached children from the ingested graph
- Outputs: a list of the person objects successfully validated, and the number of failed validations
- Functionality: This operates on the level of each root person object, looking at each person and its child entities all at once, to check that specific entities meet certain assumptions. The exact set of validation rules will expand over time.
-
Notable implementations:
validate_county_person
,validate_state_person
- Jurisdiction overrides?: No
- Inputs: a list of person objects hydrated with attached children from the ingested graph
- Outputs: a list of person objects with fully hydrated entity graphs based on ingested state and previously written state
- Functionality: The entity matcher does a few important things: 1) matches ingested persons to database persons, 2) matches non-person entities to non-person database entities, 3) matches/updates relationships among entities in the updated entity graph. Matching of non-person entities is based on provided external ids and cosine similarity, while matching of person entities is based on provided external ids and personally identifiable information. Relationship matching is a bit more complex and deals with the fact that new entities and entity updates will come in unpredictable orders over time, and how these entities may thus change in various ways.
-
Notable implementations:
county_entity_matcher
,state_entity_matcher
- Other notes: The output object also includes any errors that occurred during the matching process, as well as any entities that were found to be "orphaned," i.e. after the matching process, the entity no longer has a direct chain of ancestor entities back to a particular person, though it may have a person reference anyway. These error counts are used to determine whether to continue with persistence to the database.
- Jurisdiction overrides?: Yes, optional pre- and post-processing hooks specific to a given region
- Inputs: a list of person objects with fully hydrated entity graphs based on ingested state and previously written state
- Outputs: whether or not persistence was successful
-
Functionality: After entity matching, we now have a full entity graph, rooted around a person, that we are ready to upsert into the database. Among the person and its child entities, we determine which entities are brand new and which already exist -- new entities are inserted and their historical snapshots are first opened (specifying
valid_from
and leavingvalid_to
unspecified); existing entities are updated, their existing historical snapshots are closed (specifyingvalid_to
), and new historical snapshots are opened. -
Notable implementations:
persistence
,batch_persistence
,historical_snapshot_updater
- Jurisdiction overrides?: No
- Home
- Architecture
- Schemas
- Methodology
- Data Extraction
- Data Normalization
- Entity Matching
- Recidivism Measurement
- Development
- Local Development
- Create a Scraper
- Add a New Schema
- Update BigQuery Views
- Continuous Integration
- Operations