Election year has finally rolled back around. You know what that means? Millions, if not billions, of dollars are being spent on sending candidates to Capitol Hill and the White House. In 1974, the Federal Election Commission (FEC) was created to oversee every federal election in the United States. As a result of this, every candidate's financial records were made openly accessible to the public. This project will use FEC data to create a dashboard monitoring all election finances in the 2024 election cycle.
After figuring out what data I would need, I made an ERD based on the data documentation provided by the FEC. Each table has an ingestion DAG to move the data from the CSVs to an S3 bucket.
Data Orchestration:
- Apache Airflow
Data Processing/Cleaning:
- Polars
- Pandas
Data Warehouse and Staging:
- AWS S3
- Snowflake
Data Transformations:
- DBT (Data Build Tool)
- SQL (of course)
Data Visualization / BI:
- Metabase
Other:
- AWS IAM (Access Management for AWS)
- Notion (Project management / Documentation)
- Docker (App containerization)
There are two types of ingestion DAGs:
- DAGs that download the bulk data from the FEC website.
- DAGs that scrape data straight from the tables on the FEC website. These DAGs are reserved for data in "type" tables (i.e. Report Type, Transaction Type, and Political Parties).
Example of a "Bulk Data" DAG (TLDR - Full code: here):
For these types of DAGs, I chose to convert the files into parquet files because it decreased my DAG runtimes and cut costs on S3 storage (Only $5, but hey money is money!). For example for the "Individual Contributions" DAG, the runtime went from **15m 57s → 5m 25s and the file size went from 3.4 GB → 528.7 MB
** = Not including the DAG trigger.
Example of a "Web Scraper" DAG (TLDR - Full code: here):
Before loading the data into the data warehouse, I had to make the tables (see ERD) and create an S3 stage for each table. To make the S3 stages, allow Snowflake access to my "./FEC" S3 Bucket. I created a custom role for Snowflake in AWS IAM and created the stages using a script like this:
* Snowflake recommends using CREATE INTEGRATION to give access to S3 instead of using the AWS ID and AWS SECRET KEY!
A corresponding COPY INTO DAG is triggered at the end of both types of ingestion DAGs in the previous section. These DAGs follow these steps:
- The DAG receives a run date as a parameter.
- Truncate the table in Snowflake to avoid duplicate data.
- Copy the data into the table using the run date parameter.
- Run DBT models*
* Didn't add it because I use DBT locally instead of DBT cloud, and Airflow wasn't cooperating with DBT.
Here's an example using the Candidates table:
I used DBT to make data models to be used by Metabase. All my sources tables are placed in the "Election.Raw" schema, and all the models created with DBT are placed in the "Election.Analytics" schema. Below you will see the lineage graphs of the "src" models and the "fct" views they make. If you want to look at the full DBT project look here.
Lineage Graphs (Click to make readable):
src_candidates | src_committee_transactions |
---|---|
src_committees | src_independent_exp |
---|---|
src_individual_cont | src_operating_exp |
---|---|