forked from oturns/geosnap
-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
249 lines (215 loc) · 8.24 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
import os
import pathlib
import pooch
from urllib.error import HTTPError
from warnings import warn
import geopandas as gpd
import pandas as pd
def get_census_gdb(years=None, geom_level="blockgroup", output_dir="."):
"""Fetch file geodatabases of ACS demographic profile data from the Census bureau server.
Parameters
----------
years : list, optional
set of years to download (2010 onward), defaults to 2010-2019
geom_level : str, optional
geographic unit to download (tract or blockgroup), by default "blockgroup"
output_dir : str, optional
output directory to write files, by default "."
"""
try:
from download import download
except ImportError:
raise ImportError(
"this function requires choldgraf's `download` package\n"
"`pip install git+https://github.com/choldgraf/download`"
)
levels = {"blockgroup": "bg", "tract": "tract"}
if not years:
years = range(2010, 2020)
for year in years:
fn = f"ACS_{year}_5YR_{levels[geom_level].upper()}.gdb.zip"
pth = pathlib.PurePath(output_dir, fn)
if year in [2010, 2011]:
if geom_level == "blockgroup":
raise Exception(f"blockgroup data not available for {year}")
fn = f"{year}_ACS_5YR_{geom_level.capitalize()}.gdb.zip"
out_fn = f"ACS_{year}_5YR_{levels[geom_level].upper()}.gdb.zip"
pth = pathlib.PurePath(output_dir, out_fn)
url = f"https://www2.census.gov/geo/tiger/TIGER_DP/{year}ACS/{fn}"
pooch.retrieve(url, None, progressbar=True, path=pth)
def reformat_acs_vars(col):
"""Convert variable names to the same format used by the Census Detailed Tables API.
See <https://api.census.gov/data/2019/acs/acs5/variables.html> for variable descriptions
Parameters
----------
col : str
column name to adjust
Returns
-------
str
reformatted column name
"""
pieces = col.split("e")
formatted = pieces[0] + "_" + pieces[1].rjust(3, "0") + "E"
return formatted
def convert_census_gdb(
file,
year=None,
layers=None,
level="bg",
save_intermediate=True,
combine=True,
output_dir=".",
):
"""Convert file geodatabases from Census into (set of) parquet files.
Parameters
----------
file : str
path to file geodatabase
year : str
year that the data should be named by. If none, will try to infer from the filename
based on convention from the Census Bureau FTP server
layers : list, optional
set of layers to extract from geodatabase. If none (default), all layers will be extracted
level : str, optional
geographic level of data ('bg' for blockgroups or 'tr' for tract), by default "bg"
save_intermediate : bool, optional
if true, each layer will be stored separately as a parquet file, by default True
combine : bool, optional
whether to store and concatenate intermediate dataframes, default is True
output_dir : str, optional
path to directory where parquet files will be written, by default "."
"""
try:
import pyogrio as ogr
except ImportError:
raise ImportError(
"this function requires the `pyogrio` package\n" "`conda install pyogrio`"
)
if not layers: # grab them all except the metadata
year_suffix = file.split(".")[0].split("_")[1][-2:]
meta_str = f"{level.upper()}_METADATA_20{year_suffix}"
layers = [layer[0] for layer in ogr.list_layers(file)]
if meta_str in layers:
layers.remove(meta_str)
if (
not year
): # make a strong assumption about the name of the file coming from census
year = file.split("_")[1]
tables = []
for i in layers:
print(i)
df = ogr.read_dataframe(file, layer=i).set_index("GEOID")
if "ACS_" in i:
df = gpd.GeoDataFrame(df)
else:
df = df[df.columns[df.columns.str.contains("e")]]
df.columns = pd.Series(df.columns).apply(reformat_acs_vars)
df = df.dropna(axis=1, how="all")
if combine:
tables.append(df)
if save_intermediate:
df.to_parquet(
pathlib.PurePath(output_dir, f"acs_{year}_{i}_{level}.parquet")
)
if combine:
df = pd.concat(tables, axis=1)
if f"ACS_{year}_5YR_{level.upper()}" in layers:
df = gpd.GeoDataFrame(df)
df.to_parquet(pathlib.PurePath(output_dir, f"acs_{year}_{level}.parquet"))
def get_lehd(dataset="wac", state="dc", year=2015):
"""Grab data from the LODES FTP server as a pandas DataFrame.
Parameters
----------
dataset : str
which LODES dataset to collect: "rac" or wac", reffering to either
residence area characteristics or workplace area characteristics
the default is 'wac').
state : str
two-digit state abbreviation for example "ca" or "OH"
year : str
which year to collect. First year avaialable for most states is 2002.
Consult the LODES documentation for more details. The default is 2015.
Returns
-------
pandas.DataFrame
a pandas DataFrame with columns representing census blocks, indexed on
the block FIPS code.
"""
lodes_vars = pd.read_csv(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "lodes.csv")
)
renamer = dict(zip(lodes_vars["variable"].tolist(), lodes_vars["name"].tolist()))
state = state.lower()
url = "https://lehd.ces.census.gov/data/lodes/LODES7/{state}/{dataset}/{state}_{dataset}_S000_JT00_{year}.csv.gz".format(
dataset=dataset, state=state, year=year
)
try:
df = pd.read_csv(url, converters={"w_geocode": str, "h_geocode": str})
except HTTPError:
raise ValueError(
"Unable to retrieve LEHD data. Check your internet connection "
"and that the state/year combination you specified is available"
)
df = df.rename({"w_geocode": "geoid", "h_geocode": "geoid"}, axis=1)
df.rename(renamer, axis="columns", inplace=True)
df = df.set_index("geoid")
return df
def adjust_inflation(df, columns, given_year, base_year=2015):
"""
Adjust currency data for inflation.
Parameters
----------
df : DataFrame
Dataframe of historical data
columns : list-like
The columns of the dataframe with currency data
given_year: int
The year in which the data were collected; e.g. to convert data from
the 1990 census to 2015 dollars, this value should be 1990.
base_year: int, optional
Constant dollar year; e.g. to convert data from the 1990
census to constant 2015 dollars, this value should be 2015.
Default is 2015.
Returns
-------
type
DataFrame
"""
# get inflation adjustment table from BLS
try:
inflation = pd.read_csv(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "inflation.csv")
)
except FileNotFoundError:
warn("Unable to read local inflation adjustment file. Streaming from BLS")
inflation = pd.read_excel(
"https://www.bls.gov/cpi/research-series/allitems.xlsx", skiprows=5
)
if base_year not in inflation.YEAR.unique():
warn(
f"Unable to find local adjustment year for {base_year}. Attempting from online data"
)
try:
inflation = pd.read_excel(
"https://www.bls.gov/cpi/research-series/allitems.xlsx", skiprows=5
)
assert (
base_year in inflation.YEAR.unique()
), f"Unable to find adjustment values for {base_year}"
except Exception:
raise ValueError(f"Unable to find adjustment values for {base_year}")
inflation.columns = inflation.columns.str.lower()
inflation.columns = inflation.columns.str.strip(".")
inflation = inflation.dropna(subset=["year"])
inflator = inflation.groupby("year")["avg"].first().to_dict()
inflator[1970] = 63.9
df = df.copy()
updated = df[columns].apply(
lambda x: x * (inflator[base_year] / inflator[given_year])
)
df.update(updated)
return df
if __name__ == "__main__":
get_lehd()
adjust_inflation()