Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Move Python aggregation funcs to new file #244

Draft
wants to merge 1 commit into
base: add-long-agg
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Python-packages/covidcast-py/covidcast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

"""

from .covidcast import signal, metadata, aggregate_signals
from .covidcast import signal, metadata
from .wrangle import aggregate_signals
from .plotting import plot, plot_choropleth, get_geo_df, animate
from .geography import (fips_to_name, cbsa_to_name, abbr_to_name,
name_to_abbr, name_to_cbsa, name_to_fips,
Expand Down
95 changes: 0 additions & 95 deletions Python-packages/covidcast-py/covidcast/covidcast.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""This is the client side library for accessing the COVIDcast API."""
import warnings
from datetime import timedelta, date
from functools import reduce
from typing import Union, Iterable, Tuple, List

import pandas as pd
Expand Down Expand Up @@ -272,100 +271,6 @@ def metadata() -> pd.DataFrame:
return meta_df


def aggregate_signals(signals: list,
dt: list = None,
join_type: str = "outer",
output_format:str = "wide") -> pd.DataFrame:
"""Given a list of DataFrames, [optionally] lag each one and join them into one DataFrame.

This method takes a list of DataFrames containing signal information for
geographic regions across time, and outputs a single DataFrame of the signals aggregated
with lags applied to signals if specified. The input DataFrames must all be of the same
geography type.

If ``output_format = 'wide'``, a DataFrame with a column
for each signal value for each region/time. The ``data_source``,
``signal``, and index of each DataFrame in ``signals`` are appended to the
front of each output column name separated by underscores (e.g.
``source_signal_0_inputcolumn``), and the original data_source and signal
columns will be dropped. A single ``geo_type`` column will be returned in the final
DataFrame.

If ``output_format = 'wide'``, all input DataFrames must have the same columns,
and the output will be the concatenation of all the lagged DataFrames.

Each signal's time value can be shifted for analysis on lagged signals using the ``dt``
argument, which takes a list of integer days to lag each signal's date. Lagging a signal by +1
day means that all the dates get shifted forward by 1 day (e.g. Jan 1 becomes Jan 2).

:param signals: List of DataFrames to join.
:param dt: List of lags in days for each of the input DataFrames in ``signals``.
Defaults to ``None``. When provided, must be the same length as ``signals``.
:param join_type: Type of join to be done between the DataFrames in ``signals``.
Defaults to ``"outer"``, so the output DataFrame contains all region/time
combinations at which at least one signal was observed.
Only applies if ``output_format='wide'``
:param output_format: ``'wide'`` or ``'long'``. If ``wide``, a dataframe with a column
per signal is returned. If ``long``, all signals are concatenated into one dataframe with
a single column for the signal value.
:return: DataFrame of aggregated signals.

"""
if dt is not None and len(dt) != len(signals):
raise ValueError("Length of `dt` must be same as length of `signals`")
if output_format not in ["long", "wide"]:
raise ValueError("`output_format` must be either 'long' or 'wide'")

dt = [0] * len(signals) if not dt else dt
first_geo_type = _detect_metadata(signals[0])[2]
dt_dfs = []
for df, lag in zip(signals, dt):
source, sig_type, geo_type = _detect_metadata(df)
if geo_type != first_geo_type:
raise ValueError("Multiple geo_types detected. "
"All signals must have the same geo_type to be aggregated.")
df_c = df.copy() # make a copy so we don't modify originals
df_c["time_value"] = [day + timedelta(lag) for day in df_c["time_value"]] # lag dates
dt_dfs.append((df_c, source, sig_type, geo_type))
return _agg_wide(dt_dfs, join_type) if output_format == "wide" else _agg_long(dt_dfs)


def _agg_wide(processed_signals: list,
join_type: str = "outer") -> pd.DataFrame:
"""Join together a list of signal DataFrames, renaming columns to prevent collisions.

:param processed_signals: List of df and metadata tuples to join together.
:param join_type: Type of join to conduct between all the DataFrames.
:return: A single DataFrames which is the join of the input DataFrames.
"""
join_cols = ["time_value", "geo_value"]
for i, (df, source, sig_type, _) in enumerate(processed_signals):
# drop and rename columns so the joined doesn't have duplicate and/or redundant columns.
df.drop(["signal", "data_source", "geo_type"], axis=1, inplace=True)
df.rename(
columns={j: f"{source}_{sig_type}_{i}_{j}" for j in df.columns if j not in join_cols},
inplace=True)
dfs_to_join = [df for df, *_ in processed_signals]
joined_df = reduce(lambda x, y: pd.merge(x, y, on=join_cols, how=join_type, sort=True),
dfs_to_join)
joined_df["geo_type"] = processed_signals[0][-1] # use geotype of first df
return joined_df


def _agg_long(processed_signals: list) -> pd.DataFrame:
"""Concatenate a list of signal DataFrames with identical columns.

:param processed_signals: List of DataFrame and metadata tuples to concatenate together.
:return: Single DataFrames of all input signals concatenated
"""
first_columns = processed_signals[0][0].columns
for df, *_ in processed_signals:
if any(df.columns != first_columns):
raise ValueError("Inconsistent columns detected. All columns must be the same to use"
"'long' output.")
return pd.concat([df for df, *_ in processed_signals]).reset_index(drop=True)


def _detect_metadata(data: pd.DataFrame,
data_source_col: str = "data_source",
signal_col: str = "signal",
Expand Down
100 changes: 100 additions & 0 deletions Python-packages/covidcast-py/covidcast/wrangle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Functions for manipulating signal DataFrames."""
from datetime import timedelta
from functools import reduce

import pandas as pd

from .covidcast import _detect_metadata


def aggregate_signals(signals: list,
dt: list = None,
join_type: str = "outer",
output_format: str = "wide") -> pd.DataFrame:
"""Given a list of DataFrames, [optionally] lag each one and join them into one DataFrame.

This method takes a list of DataFrames containing signal information for
geographic regions across time, and outputs a single DataFrame of the signals aggregated
with lags applied to signals if specified. The input DataFrames must all be of the same
geography type.

If ``output_format = 'wide'``, a DataFrame with a column
for each signal value for each region/time. The ``data_source``,
``signal``, and index of each DataFrame in ``signals`` are appended to the
front of each output column name separated by underscores (e.g.
``source_signal_0_inputcolumn``), and the original data_source and signal
columns will be dropped. A single ``geo_type`` column will be returned in the final
DataFrame.

If ``output_format = 'wide'``, all input DataFrames must have the same columns,
and the output will be the concatenation of all the lagged DataFrames.

Each signal's time value can be shifted for analysis on lagged signals using the ``dt``
argument, which takes a list of integer days to lag each signal's date. Lagging a signal by +1
day means that all the dates get shifted forward by 1 day (e.g. Jan 1 becomes Jan 2).

:param signals: List of DataFrames to join.
:param dt: List of lags in days for each of the input DataFrames in ``signals``.
Defaults to ``None``. When provided, must be the same length as ``signals``.
:param join_type: Type of join to be done between the DataFrames in ``signals``.
Defaults to ``"outer"``, so the output DataFrame contains all region/time
combinations at which at least one signal was observed.
Only applies if ``output_format='wide'``
:param output_format: ``'wide'`` or ``'long'``. If ``wide``, a dataframe with a column
per signal is returned. If ``long``, all signals are concatenated into one dataframe with
a single column for the signal value.
:return: DataFrame of aggregated signals.

"""
if dt is not None and len(dt) != len(signals):
raise ValueError("Length of `dt` must be same as length of `signals`")
if output_format not in ["long", "wide"]:
raise ValueError("`output_format` must be either 'long' or 'wide'")

dt = [0] * len(signals) if not dt else dt
first_geo_type = _detect_metadata(signals[0])[2]
dt_dfs = []
for df, lag in zip(signals, dt):
source, sig_type, geo_type = _detect_metadata(df)
if geo_type != first_geo_type:
raise ValueError("Multiple geo_types detected. "
"All signals must have the same geo_type to be aggregated.")
df_c = df.copy() # make a copy so we don't modify originals
df_c["time_value"] = [day + timedelta(lag) for day in df_c["time_value"]] # lag dates
dt_dfs.append((df_c, source, sig_type, geo_type))
return _agg_wide(dt_dfs, join_type) if output_format == "wide" else _agg_long(dt_dfs)


def _agg_wide(processed_signals: list,
join_type: str = "outer") -> pd.DataFrame:
"""Join together a list of signal DataFrames, renaming columns to prevent collisions.

:param processed_signals: List of df and metadata tuples to join together.
:param join_type: Type of join to conduct between all the DataFrames.
:return: A single DataFrames which is the join of the input DataFrames.
"""
join_cols = ["time_value", "geo_value"]
for i, (df, source, sig_type, _) in enumerate(processed_signals):
# drop and rename columns so the joined doesn't have duplicate and/or redundant columns.
df.drop(["signal", "data_source", "geo_type"], axis=1, inplace=True)
df.rename(
columns={j: f"{source}_{sig_type}_{i}_{j}" for j in df.columns if j not in join_cols},
inplace=True)
dfs_to_join = [df for df, *_ in processed_signals]
joined_df = reduce(lambda x, y: x.merge(y, on=join_cols, how=join_type, sort=True), dfs_to_join)
joined_df["geo_type"] = processed_signals[0][-1] # use geotype of first df
return joined_df


def _agg_long(processed_signals: list) -> pd.DataFrame:
"""Concatenate a list of signal DataFrames with identical columns.

:param processed_signals: List of DataFrame and metadata tuples to concatenate together.
:return: Single DataFrames of all input signals concatenated
"""
first_columns = processed_signals[0][0].columns
for df, *_ in processed_signals:
if any(df.columns != first_columns):
raise ValueError("Inconsistent columns detected. All columns must be the same to use"
"'long' output.")
return pd.concat([df for df, *_ in processed_signals]).reset_index(drop=True)
95 changes: 0 additions & 95 deletions Python-packages/covidcast-py/tests/covidcast/test_covidcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,101 +99,6 @@ def test_metadata(mock_covidcast_meta):
covidcast.metadata()


def test_aggregate_signals():
test_input1 = pd.DataFrame(
{"geo_value": ["a", "b", "c", "a"],
"time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)],
"value": [2, 4, 6, 8],
"signal": ["i", "i", "i", "i"],
"geo_type": ["state", "state", "state", "state"],
"data_source": ["x", "x", "x", "x"]})
test_input2 = pd.DataFrame(
{"geo_value": ["a", "b", "c", "d"],
"time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1)],
"value": [1, 3, 5, 7],
"signal": ["j", "j", "j", "j"],
"geo_type": ["state", "state", "state", "state"],
"data_source": ["y", "y", "y", "y"],
"extra_col": ["0", "0", "0", "0"]})
test_input3 = pd.DataFrame(
{"geo_value": ["b", "c", "d", "b"],
"time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)],
"value": [0.5, 1.5, 2.5, 3.5],
"signal": ["k", "k", "k", "k"],
"geo_type": ["state", "state", "state", "state"],
"data_source": ["z", "z", "z", "z"]})
# test 3 signals from 3 sources with outer join
expected1 = pd.DataFrame(
{"geo_value": ["a", "b", "c", "d", "a", "b", "c", "d", "b"],
"time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1),
date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2),
date(2020, 1, 3)],
"x_i_0_value": [2, 4, 6, np.nan, 8, np.nan, np.nan, np.nan, np.nan],
"y_j_1_value": [1, 3, 5, 7, np.nan, np.nan, np.nan, np.nan, np.nan],
"y_j_1_extra_col": ["0", "0", "0", "0", np.nan, np.nan, np.nan, np.nan, np.nan],
"z_k_2_value": [np.nan, np.nan, np.nan, np.nan, np.nan, 0.5, 1.5, 2.5, 3.5],
"geo_type": ["state"]*9})
assert covidcast.aggregate_signals(
[test_input1, test_input2, test_input3], dt=[0, 0, 1]).equals(expected1)

# test 3 signals from 3 sources with inner join has no intersection
assert covidcast.aggregate_signals(
[test_input1, test_input3], dt=[0, 1], join_type="inner").empty

# test 2 signals from same source (one lagged) with inner join
expected2 = pd.DataFrame(
{"geo_value": ["a"],
"time_value": [date(2020, 1, 2)],
"x_i_0_value": [8],
"x_i_1_value": [2],
"geo_type": ["state"]})
assert covidcast.aggregate_signals(
[test_input1, test_input1], dt=[0, 1], join_type="inner").equals(expected2)

# test same signal twice with a lag
expected3 = pd.DataFrame(
{"geo_value": ["a", "b", "c", "a", "b", "c", "a"],
"time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2),
date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 3)],
"x_i_0_value": [2, 4, 6, 8, np.nan, np.nan, np.nan],
"x_i_1_value": [np.nan, np.nan, np.nan, 2, 4, 6, 8],
"geo_type": ["state"]*7})

assert covidcast.aggregate_signals([test_input1, test_input1], dt=[0, 1]).equals(expected3)

# test long output
expected4 = pd.DataFrame(
{"geo_value": ["a", "b", "c", "a"]*2,
"time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2),
date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 3)],
"value": [2, 4, 6, 8]*2,
"signal": ["i", "i", "i", "i"]*2,
"geo_type": ["state", "state", "state", "state"]*2,
"data_source": ["x", "x", "x", "x"]*2})

assert covidcast.aggregate_signals([test_input1, test_input1],
dt=[0, 1],
output_format="long").equals(expected4)
# test long output with different column names
with pytest.raises(ValueError):
covidcast.aggregate_signals([test_input1, test_input2], output_format="long")

# test invalid lag length
with pytest.raises(ValueError):
covidcast.aggregate_signals([test_input1, test_input1], dt=[0])

# test mixed geo_types
test_input4 = pd.DataFrame(
{"geo_value": ["b", "c", "d", "b"],
"time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)],
"value": [0.5, 1.5, 2.5, 3.5],
"signal": ["k", "k", "k", "k"],
"geo_type": ["county", "county", "county", "county"],
"data_source": ["z", "z", "z", "z"]})
with pytest.raises(ValueError):
covidcast.aggregate_signals([test_input1, test_input4])


def test__detect_metadata():
test_input = pd.DataFrame(
{"data_source": ["a", "a"], "signal": ["b", "b"], "geo_type": ["c", "c"]})
Expand Down