Skip to content

Data Warehouse

Blake Sharp edited this page Oct 20, 2020 · 3 revisions

Data Warehouse

Much of our calculation work is expansive both vertically and horizontally: looking across all of a given population, throughout large periods of time. These queries can produce significant strain on a standard relational database which we want to keep performant for ongoing ingest processing. As such, as perform all of our calculation work in our data warehouse, BigQuery.

Structure

BigQuery is organized into datasets, which in turn contain tables and views. Queries can be written which reference tables or views in different datasets, but the user or service account executing the query must have permissions to read from or write to each dataset.

At present, our warehouse is organized in the following fashion:

  • A separate BigQuery cluster exists for both staging and production, with no intermingling of data or user permissions
  • Each cluster is divided into datasets as follows:
    • A state dataset which includes all of the individual-level data exported from our state corrections and supervision schema
    • A county dataset which includes all of the individual-level data exported from our county corrections schema, as well as the scrape-based aggregate ingest output with which it is frequently joined
    • A Dataflow output dataset which includes all of the metrics produced by our batch processing pipelines
    • A dataset for each consumer that relies on metric packages exported from BigQuery into Cloud Storage, e.g. a dataset for the views that power our Dashboard application
    • Datasets containing exogenous data dumps that we join against our own datasets for analysis, e.g. data from public sources or from partner criminal justice organizations
    • User-created datasets used for exploratory analysis

Orchestration

A pipeline exists to export our individual-level data to the warehouse, run our batch calculation pipelines based on the latest data, re-materialize SQL views based on both the latest individual-level data and the latest batch output, and then export those views as needed for different consumers.

Export from Cloud SQL

Every day, our staging and production databases are exported in full to our staging and production data warehouses, respectively. The export workflow is as follows:

  1. A daily cron job launches a request to the /export_manager/create_export_tasks endpoint in the main platform application layer, for a particular schema
  2. The request handler retrieves a list of all tables in that schema that should be exported and enqueues an export task for each table to a Cloud Tasks queue for BigQuery exports
    1. The request handler also enqueues an export monitor task to a separate queue for monitoring the status of exports
  3. Each export task exports the given table to CSV and then loads the CSV into BigQuery
    1. The task calls a Cloud SQL API for exporting the given table to a CSV file, stored temporarily in a secured Cloud Storage bucket
      1. Because the Cloud SQL API only supports one export operation per instance at a time (returning client errors otherwise), the task polls a Cloud SQL export status API until the operation is successful, and then returns
    2. The task calls a BigQuery API for importing CSV files from Cloud Storage into a particular dataset and table. The dataset is retrievable as static configuration
      1. The BigQuery import is configured as WRITE_TRUNCATE, meaning that it overwrites the previous state of the table in full each time
      2. Though not required by any BigQuery limitation, the task polls a BigQuery API for checking load status until the operation is successful, and then returns
    3. Once the BigQuery import is complete, the export task is finished
  4. While the different export tasks have been running on the export queue, the export monitor task has been checking to see if the full export is complete by checking the size of the export queue.
    1. If there are any tasks still in that queue, the export is not finished. If this is still the case, the monitor task enqueues another monitor task with a delay, to avoid a tight monitoring loop
    2. If the export queue is now empty, the full export to our data warehouse is complete. The task responds by publishing a message to the warehouse export Pub/Sub topic, where it gets picked up in the next section

Post-export calculations

Once a daily export from Cloud SQL to BigQuery has completed, a message is published to a specific Pub/Sub topic as noted in the final step of the previous section. Different consumers can register with Pub/Sub to listen to this message and launch further downstream processes.

The primary such consumer at present is a Cloud Function that triggers a directed acyclic graph in Cloud Composer that orchestrates our Dataflow batch calculation pipelines. All of the pipelines run at the same time. When a state's pipelines are finished, a message including the state's code is published to a Pub / Sub topic.

One more Cloud Function listens to the batch processing Pub/Sub topic and triggers the materialization of specific views for specific consumers in the form of metric packages. The primary use case of this at present is metric packages for our Dashboard: the results of re-running the queries for a specific set of BigQuery views are exported as JSON files to a Cloud Storage bucket where they can be retrieved by the Dashboard's backend and passed to the frontend for visualization.

Manual access

At this point, we have automatically updated the datasets for both our ingested data and our calculations in the warehouse to their freshest state, and made these available for consuming applications. Data scientists and researchers with access can also make use of this information with direct BigQuery access to designated datasets.

Many human users can tolerate changes in the dataset and prefer the knowledge that they always have the freshest state of the underlying data. But users who prefer stable data checkpoints may clone a particular table or dataset and perform their work thereupon. This is a trivial operation in the BigQuery UI. Users are encouraged to keep these copies within their own sandboxed datasets.