Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add long output to Python aggregate_signals() method #243

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 60 additions & 18 deletions Python-packages/covidcast-py/covidcast/covidcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,19 +272,28 @@ def metadata() -> pd.DataFrame:
return meta_df


def aggregate_signals(signals: list, dt: list = None, join_type: str = "outer") -> pd.DataFrame:
def aggregate_signals(signals: list,
dt: list = None,
join_type: str = "outer",
output_format:str = "wide") -> pd.DataFrame:
"""Given a list of DataFrames, [optionally] lag each one and join them into one DataFrame.

This method takes a list of DataFrames containing signal information for
geographic regions across time, and outputs a single DataFrame with a column
geographic regions across time, and outputs a single DataFrame of the signals aggregated
with lags applied to signals if specified. The input DataFrames must all be of the same
geography type.

If ``output_format = 'wide'``, a DataFrame with a column
for each signal value for each region/time. The ``data_source``,
``signal``, and index of each DataFrame in ``signals`` are appended to the
front of each output column name separated by underscores (e.g.
``source_signal_0_inputcolumn``), and the original data_source and signal
columns will be dropped. The input DataFrames must all be of the same
geography type, and a single ``geo_type`` column will be returned in the final
columns will be dropped. A single ``geo_type`` column will be returned in the final
DataFrame.

If ``output_format = 'wide'``, all input DataFrames must have the same columns,
and the output will be the concatenation of all the lagged DataFrames.

Each signal's time value can be shifted for analysis on lagged signals using the ``dt``
argument, which takes a list of integer days to lag each signal's date. Lagging a signal by +1
day means that all the dates get shifted forward by 1 day (e.g. Jan 1 becomes Jan 2).
Expand All @@ -295,35 +304,68 @@ def aggregate_signals(signals: list, dt: list = None, join_type: str = "outer")
:param join_type: Type of join to be done between the DataFrames in ``signals``.
Defaults to ``"outer"``, so the output DataFrame contains all region/time
combinations at which at least one signal was observed.
Only applies if ``output_format='wide'``
:param output_format: ``'wide'`` or ``'long'``. If ``wide``, a dataframe with a column
per signal is returned. If ``long``, all signals are concatenated into one dataframe with
a single column for the signal value.
:return: DataFrame of aggregated signals.

"""
if dt is not None and len(dt) != len(signals):
raise ValueError("Length of `dt` must be same as length of `signals`")
if output_format not in ["long", "wide"]:
raise ValueError("`output_format` must be either 'long' or 'wide'")

dt = [0] * len(signals) if not dt else dt
join_cols = ["time_value", "geo_value"]
dt_dfs = []
first_geo_type = _detect_metadata(signals[0])[2]

for i, (df, lag) in enumerate(zip(signals, dt)):
df_c = df.copy() # make a copy so we don't modify originals
source, sig_type, geo_type = _detect_metadata(df_c)
dt_dfs = []
for df, lag in zip(signals, dt):
source, sig_type, geo_type = _detect_metadata(df)
if geo_type != first_geo_type:
raise ValueError("Multiple geo_types detected. "
"All signals must have the same geo_type to be aggregated.")

df_c = df.copy() # make a copy so we don't modify originals
df_c["time_value"] = [day + timedelta(lag) for day in df_c["time_value"]] # lag dates
df_c.drop(["signal", "data_source", "geo_type"], axis=1, inplace=True)
df_c.rename(
columns={j: f"{source}_{sig_type}_{i}_{j}" for j in df_c.columns if j not in join_cols},
inplace=True)
dt_dfs.append(df_c)
dt_dfs.append((df_c, source, sig_type, geo_type))
return _agg_wide(dt_dfs, join_type) if output_format == "wide" else _agg_long(dt_dfs)


def _agg_wide(processed_signals: list,
join_type: str = "outer") -> pd.DataFrame:
"""Join together a list of signal DataFrames, renaming columns to prevent collisions.

joined_df = reduce(lambda x, y: pd.merge(x, y, on=join_cols, how=join_type, sort=True), dt_dfs)
joined_df["geo_type"] = geo_type
:param processed_signals: List of df and metadata tuples to join together.
:param join_type: Type of join to conduct between all the DataFrames.
:return: A single DataFrames which is the join of the input DataFrames.
"""
join_cols = ["time_value", "geo_value"]
for i, (df, source, sig_type, _) in enumerate(processed_signals):
# drop and rename columns so the joined doesn't have duplicate and/or redundant columns.
df.drop(["signal", "data_source", "geo_type"], axis=1, inplace=True)
df.rename(
columns={j: f"{source}_{sig_type}_{i}_{j}" for j in df.columns if j not in join_cols},
inplace=True)
dfs_to_join = [df for df, *_ in processed_signals]
joined_df = reduce(lambda x, y: pd.merge(x, y, on=join_cols, how=join_type, sort=True),
dfs_to_join)
joined_df["geo_type"] = processed_signals[0][-1] # use geotype of first df
return joined_df


def _agg_long(processed_signals: list) -> pd.DataFrame:
"""Concatenate a list of signal DataFrames with identical columns.

:param processed_signals: List of DataFrame and metadata tuples to concatenate together.
:return: Single DataFrames of all input signals concatenated
"""
first_columns = processed_signals[0][0].columns
for df, *_ in processed_signals:
if any(df.columns != first_columns):
raise ValueError("Inconsistent columns detected. All columns must be the same to use"
"'long' output.")
return pd.concat([df for df, *_ in processed_signals]).reset_index(drop=True)


def _detect_metadata(data: pd.DataFrame,
data_source_col: str = "data_source",
signal_col: str = "signal",
Expand Down
17 changes: 17 additions & 0 deletions Python-packages/covidcast-py/tests/covidcast/test_covidcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ def test_aggregate_signals():

assert covidcast.aggregate_signals([test_input1, test_input1], dt=[0, 1]).equals(expected3)

# test long output
expected4 = pd.DataFrame(
{"geo_value": ["a", "b", "c", "a"]*2,
"time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2),
date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 3)],
"value": [2, 4, 6, 8]*2,
"signal": ["i", "i", "i", "i"]*2,
"geo_type": ["state", "state", "state", "state"]*2,
"data_source": ["x", "x", "x", "x"]*2})

assert covidcast.aggregate_signals([test_input1, test_input1],
dt=[0, 1],
output_format="long").equals(expected4)
# test long output with different column names
with pytest.raises(ValueError):
covidcast.aggregate_signals([test_input1, test_input2], output_format="long")

# test invalid lag length
with pytest.raises(ValueError):
covidcast.aggregate_signals([test_input1, test_input1], dt=[0])
Expand Down