-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nssp pipeline code #1952
base: main
Are you sure you want to change the base?
nssp pipeline code #1952
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran the pipeline and it seems to be pulling correctly, and the fields that are generated make sense. Generally looks good, I have some minor cosmetic suggestions.
I guess the archive differ is the only part that really remains?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran these as it suggests and things run fine. The linter is a little angry. Either way
limit = 50000 # maximum limit allowed by SODA 2.0 | ||
while True: | ||
page = client.get("rdmq-nq56", limit=limit, offset=offset) | ||
if not page: | ||
break # exit the loop if no more results | ||
results.extend(page) | ||
offset += limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
limit = 50000 # maximum limit allowed by SODA 2.0 | |
while True: | |
page = client.get("rdmq-nq56", limit=limit, offset=offset) | |
if not page: | |
break # exit the loop if no more results | |
results.extend(page) | |
offset += limit | |
limit = 50_000 | |
for ii in range(100): | |
page = client.get("rdmq-nq56", limit=limit, offset=offset) | |
if not page: | |
max_ii = ii | |
break # exit the loop if no more results | |
results.extend(page) | |
offset += limit | |
if max_ii == 100: | |
raise ValueError("client has pulled 100x the socrata limit") | |
This is probably fine, but while true
freaks me out. Feel free to use or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dsweber2 why did you choose 100 here for the limit in your rewrite? I believe 5k is the per-page item limit, not the total limit. So theoretically we could get an infinite-page result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100 was maybe too low, though it would correspond to 50,000,000 items. If we're pulling more than that it should be quite a while down the road, or something has gone wrong. (or I may be misunderstanding how item counts work).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file seems to be unused: nssp/tests/test_data/page.txt
Some nits and style suggestions. Question about which geos we're reporting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some remaining documentation cleanup. I also have a few questions about the data. Looks like there is data that is not getting included in the aggregation step -- we'll want to think about what to do with that.
nssp/delphi_nssp/run.py
Outdated
elif geo == "hrr" or geo == "msa": | ||
df = df[['fips', 'val', 'timestamp']] | ||
df = geo_mapper.add_population_column(df, geocode_type="fips", geocode_col="fips") | ||
df = geo_mapper.add_geocode(df, "fips", geo, from_col="fips", new_col="geo_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: This step adds many rows (238k -> 395k) in the HRR aggregation case. It appears this is because HRRs are made up of zip codes, which cross county boundaries. So some FIPS codes map to multiple HRRs.
This step drops many rows (238k -> 92k) in the MSA case, because many counties don't fall into an MSA.
nssp/delphi_nssp/run.py
Outdated
generate the relevant population amounts, and create a weighted but | ||
unnormalized column, derived from `column_aggregating` | ||
""" | ||
# set the weight of places with na's to zero |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: What is the meaning of a missing value in the incoming data? Censored for privacy? 0? Too small a sample size to accurately report?
nssp/delphi_nssp/run.py
Outdated
df = geo_mapper.add_geocode(df, "fips", geo, from_col="fips", new_col="geo_id") | ||
df = generate_weights(df, "val") | ||
df = weighted_geo_sum(df, "geo_id", "val") | ||
df = df.groupby(["timestamp","geo_id", "val"]).sum(numeric_only=True).reset_index() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: I'm confused about this line. I don't think we need to do another sum, I thought we were done on the previous line. This step is actually removing dates/geos where the final aggregated value is NaN
, but other values are unchanged. If that is our goal, the code should be more clear about that goal (e.g. unclear why this groups by val
).
question: do we want to remove NaN
s? I can't remember what we've done in the past.
nssp/delphi_nssp/run.py
Outdated
return np.nan | ||
return np.nansum(x) | ||
|
||
def weighted_geo_sum(df: pd.DataFrame, geo: str, sensor: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: Nice! Thanks for adding the aggregation in. Now we'll have an actual reference for how to aggregate non-summable signals 👏
We'll also need to do a statistical review of the created data. Because there's no formal process for this, you're free to do whatever you think is reasonable/sufficient. I'd say a thorough analysis would include:
Example analysis 1, analysis 2. Part of analysis for wastewater Get the stakeholder to look at your plots and approve if they look reasonable. Some of this investigation will be useful to include in the signal documentation. I'd recommend putting the plots in the GitHub issue so they're easy to find later. |
nssp/delphi_nssp/run.py
Outdated
df = generate_weights(df, "val") | ||
df = weighted_geo_sum(df, "geo_id", "val") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: once the population and new geo_id columns have been added on, a simple weighted mean can be done with
df.groupby(["timestamp", "geo_id"]).apply(lambda x: sum(x.population * x.val) / x.population.sum() )
but this doesn't handle the NaN
s appropriately. Actually I'm surprised that there isn't a built-in weighted mean function. numpy
has an average function that takes weights but also doesn't let you ignore NaN
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I had to write these as a way to handle NaN
correctly for nwss and they're getting reused here. I plan to eventually add them to the geomapper. It is slower than using precomputed weights, so I'm also debating adding those. There's probably also room for optimizing them though.
We may want to find a domain expert to comment on the assumption that "# of ER visits" is proportional to "county population" and similar assumptions that actually makes this work in other contexts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, at least an acknowledgement of the assumption would be good to include in the documentation for this data source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talking with @dshemetov, apparently in the HRR case, the weighted fips -> hrr map is already present, so we may not need to do this. He may add the weighted fips -> msa map in the near future, since the data is already there for it
Oh one thing I would add from our discussion: CA does have state level data, somehow. I guess the reporting pipelines must be different. |
Does it make sense to censor these? Pros:
Cons:
|
Given we're pulling from a public datasource, I would trust them to have done the censoring correctly. Maybe this is historically a bad assumption? |
Some correlation data. I'm comparing the state level data with hhs FluThe overall (spearman) correlation is 0.87, so they are strongly correlated in general (which one very much expects). Fixed State, over time
Fixed Time over states
Which is significantly lower on average. I think we have a mild example of Simpson's paradox or something along those lines. Lag correlations
CovidOverall correlation is 0.81, so a bit worse than flu but still quite high. Covid Fixed States over timeCovid Fixed Time over states
Which is significantly lower on average than both the corresponding flu, and the spatial average. Covid Lag analysis
|
Quick analysis of nssp signals source backfill behavior
I set up this cron job to run the following chunk of code on a server every day in the past 2 weeks. The chunk creates a daily snapshot of data available through socrata API. Code to take daily snapshot from source apiimport numpy as np
import pandas as pd
from sodapy import Socrata
from datetime import date
today = date.today()
socrata_token = 'sOcrAt4t0k3n' # replace with your own Socrata token
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
df_ervisits.to_csv(f'{today}.csv', index=False) Grab those daily snapshots of data and put them into a dictionary of dataframes for analysis. Setup for analysisimport numpy as np
import pandas as pd
from sodapy import Socrata
from datetime import date
import os
import pandas as pd
# Get the current directory
current_dir = os.getcwd()
# Initialize an empty dictionary to store the dataframes
dataframes = {}
# Iterate over each file in the current directory
for file in os.listdir(current_dir):
if file.endswith(".csv"):
# Get the name of the CSV file without the extension
name = os.path.splitext(file)[0]
# Read the CSV file into a dataframe
df = pd.read_csv(file)
# Store the dataframe in the dictionary with the name as the key
dataframes[name] = df We can see here that (at least recently) the api content is updated every week some time between Thursday night and Friday afternoon. Compare snapshot codefrom datetime import datetime, timedelta
dataframes = {key: dataframes[key] for key in sorted(dataframes)}
for date1 in dataframes:
date2 = datetime.strptime(date1, "%Y-%m-%d")+ timedelta(days=1)
date2 = date2.strftime("%Y-%m-%d")
if date2 not in dataframes:
continue
if dataframes[date1].equals(dataframes[date2]):
print(f"Snapshot {date1} and Snapshot {date2} are alike.")
else:
print(f"Snapshot {date1} and Snapshot {date2} are different.") Output:
Specifically, every week the source updates with info from the week before. for datedf in dataframes:
df = dataframes[datedf]
# Convert the 'week_end' column to datetime
df['week_end'] = pd.to_datetime(df['week_end'])
# Get the row with the most recent date
most_recent_row = df.loc[df['week_end'].idxmax()]
print(f"Most recent row in {datedf}:")
print(most_recent_row['week_end'])
More comparison specifically the case between data available on 20240418 and 20240419 here. |
Co-authored-by: David Weber <david.weber2@pm.me>
Co-authored-by: David Weber <david.weber2@pm.me>
Co-authored-by: David Weber <david.weber2@pm.me>
Co-authored-by: nmdefries <42820733+nmdefries@users.noreply.github.com>
Co-authored-by: nmdefries <42820733+nmdefries@users.noreply.github.com>
Co-authored-by: nmdefries <42820733+nmdefries@users.noreply.github.com>
… available sources and signals"
Co-authored-by: nmdefries <42820733+nmdefries@users.noreply.github.com>
Co-authored-by: nmdefries <42820733+nmdefries@users.noreply.github.com>
We should get this merged and running on staging @nmdefries @melange396. Every Friday we don't have it running on staging is revisions that we're losing. What blockers remain? |
Im working through a big backlog of PRs, but this one is now at the top of my list. @minhkhul: we should be able to get this running on staging before its merged, right? |
@dsweber2 @melange396 I wrote some code a month ago to call the api everyday and keep all data returned on a separate server. These can be transformed and loaded as patches later. So no worries on missing data. |
Oh, good! I thought i remembered having a discussion about that but wasnt sure if it was for this or something else. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna do a deeper look at the core code, but i did a first pass to see if i could narrow down what files i needed to inspect more closely; i knocked off 29/46 files that way!
Theres a few things that seem to be out of scope for this PR, like all the max-line-length=120
in the pylint configs from other indicators... Are they going to break the lint steps in other indicators?
"max_age":15, | ||
"max_age":13, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate key; which value is appropriate?
sir_complainsalot/params.json.template
claims "13" for this, but
ansible/templates/nssp-params-prod.json.j2
has "max_expected_lag":{"all": "15"}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad, this came from an apparently botched rebase. Minh specifically set it to 13
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about the "15" in the other location? are those not referring to the ~same thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. They are all uniformly max 13 days now, which is based on analysis of api data in April 2024.
Most recent data published on 2024-04-17:
2024-04-06 00:00:00
lag: 12
Most recent data published on 2024-04-18:
2024-04-06 00:00:00
lag: 13
Most recent data published on 2024-04-19:
2024-04-13 00:00:00
lag: 7
Most recent data published on 2024-04-20:
2024-04-13 00:00:00
lag: 8
Most recent data published on 2024-04-21:
2024-04-13 00:00:00
lag: 9
Most recent data published on 2024-04-22:
2024-04-13 00:00:00
lag: 10
Most recent data published on 2024-04-23:
2024-04-13 00:00:00
lag: 11
Most recent data published on 2024-04-24:
2024-04-13 00:00:00
lag: 12
Most recent data published on 2024-04-25:
2024-04-13 00:00:00
lag: 13
Most recent data published on 2024-04-17:
2024-04-06 00:00:00
lag: 12
Most recent data published on 2024-04-18:
2024-04-06 00:00:00
lag: 13
Most recent data published on 2024-04-19:
2024-04-13 00:00:00
lag: 7
Most recent data published on 2024-04-20:
2024-04-13 00:00:00
lag: 8
Most recent data published on 2024-04-21:
2024-04-13 00:00:00
lag: 9
Most recent data published on 2024-04-22:
2024-04-13 00:00:00
lag: 10
Most recent data published on 2024-04-23:
2024-04-13 00:00:00
lag: 11
Most recent data published on 2024-04-24:
2024-04-13 00:00:00
lag: 12
Most recent data published on 2024-04-25:
2024-04-13 00:00:00
lag: 13
Most recent data published on 2024-04-26:
2024-04-20 00:00:00
lag: 7
Most recent data published on 2024-04-27:
2024-04-20 00:00:00
lag: 8
Most recent data published on 2024-04-29:
2024-04-20 00:00:00
lag: 10
Most recent data published on 2024-04-30:
2024-04-20 00:00:00
lag: 11
Most recent data published on 2024-05-01:
2024-04-20 00:00:00
lag: 12
nssp/setup.py
Outdated
|
||
setup( | ||
name="delphi_nssp", | ||
version="0.0.1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nssp/delphi_nssp/__init__.py
claims version "0.1.0"... We should probably make them agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switching to 0.1.0
to match other initial release indicators. Though I did notice that e.g. claims_hosp/version.cfg
has current _version = 0.3.54
, which seems weird given that it's setup.py
says 0.1.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oof, thats a problem with our bump2version setup and/or our conventions... ill make an issue to look at it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local({ | ||
|
||
# the requested version of renv | ||
version <- "1.0.7" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file seems to be imported from some external source. will we need to do anything to keep it synced/up-to-date? i presume (while along side the notebooks/renv.lock
) it should continue to "just work" until we need it to do something it cant already do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure this is the right place to keep these notebooks to be honest, but I feel like the current set of notes about adding indicators are way too scattered. None of this folder is really intended to be automated.
renv/
and renv.lock
are maintained via a cli in R, and allow for pinning versions of dependencies (in this case, the notebooks).
Description
Add 8 signals from source nssp