Skip to content

Commit

Permalink
Support dataframes in DefaultExperiment (#160)
Browse files Browse the repository at this point in the history
* Add support for dataframes in generic dataset

* Add class method to default experiment for easier init

* Workaround for DictConfig not accepting dataframes

* Change instantiation of class

* Complete docstring
  • Loading branch information
sgpjesus committed Feb 1, 2024
1 parent d7929b9 commit cdff72e
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 13 deletions.
4 changes: 2 additions & 2 deletions src/aequitas/flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .experiment import Experiment
from .experiment import DefaultExperiment, Experiment

__all__ = ["Experiment"]
__all__ = ["DefaultExperiment", "Experiment"]
15 changes: 15 additions & 0 deletions src/aequitas/flow/datasets/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(
self,
label_column: str,
sensitive_column: str,
df: Optional[pd.DataFrame] = None,
categorical_columns: list[str] = [],
dataset_path: Optional[Union[str, Path]] = None,
train_path: Optional[Union[str, Path]] = None,
Expand All @@ -37,6 +38,15 @@ def __init__(
Parameters
----------
label_column : str
The name of the label column in the dataset.
sensitive_column : str
The name of the sensitive column in the dataset.
df : pd.DataFrame, optional
The dataset to be used. If None, the dataset will be loaded from the
specified paths. Defaults to None.
dataset_path : Union[str, Path]
The path to the dataset. May be URL.
train_path : Union[str, Path]
The path to the training data. May be URL.
validation_path : Union[str, Path]
Expand Down Expand Up @@ -72,6 +82,9 @@ def __init__(
if url(dataset_path) or Path(dataset_path).exists():
self.paths = [dataset_path]
self.split_required = True
elif df is not None:
self.data = df
self.split_required = True
else:
# Validate if other paths exist
if not (train_path and validation_path and test_path):
Expand Down Expand Up @@ -139,6 +152,8 @@ def _validate_splits(self) -> None:
def load_data(self) -> None:
"""Load the dataset."""
self.logger.info("Loading data.")
if hasattr(self, "data"):
return
if self.extension == "parquet":
read_method = pd.read_parquet
elif self.extension == "csv":
Expand Down
109 changes: 103 additions & 6 deletions src/aequitas/flow/experiment/default.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Literal, Union

from omegaconf import DictConfig
import pandas as pd

from . import _configs
from .experiment import Experiment
Expand All @@ -9,7 +10,14 @@
class DefaultExperiment(Experiment):
def __init__(
self,
dataset_config: Union[DictConfig, dict],
df: pd.DataFrame,
label_column: str,
sensitive_column: str,
categorical_columns: list[str] = [],
other_dataset_args: dict = None,
threshold_type: str = "fixed",
score_threshold: float = 0.5,
dataset_name: str = "Dataset",
methods: Union[
list[Literal["preprocessing", "inprocessing"]], Literal["all"]
] = "all",
Expand All @@ -24,19 +32,68 @@ def __init__(
Parameters
----------
dataset_config : Union[DictConfig, dict]
Dataset configuration.
df : pd.DataFrame
Pandas DataFrame with the dataset to be used in the experiment.
label_column : str
Name of the column containing the label.
sensitive_column : str
Name of the column containing the sensitive attribute.
categorical_columns : list[str], optional
List of categorical columns. Defaults to [].
other_dataset_args : dict, optional
Other arguments to pass to the dataset. Defaults to None.
threshold_type : str, optional
Threshold type. Defaults to "fixed".
score_threshold : float, optional
Score threshold. Defaults to 0.5.
dataset_name : str, optional
Dataset name. Defaults to "Dataset".
methods : Union[list[Literal["preprocessing", "inprocessing"]], Literal["all"]], optional
Methods to include in the experiment. If "all", all methods will be included.
Defaults to "all".
experiment_size : Literal["test", "small", "medium", "large"], optional
Experiment size. Defaults to "small".
use_baseline : bool, optional
Whether to include the baseline methods. Defaults to True.
Raises
------
ValueError
If the methods or experiment size are not valid.
"""
dataset_config = {
dataset_name: {
"classpath": "aequitas.flow.datasets.GenericDataset",
"threshold": {
"type": threshold_type,
"value": score_threshold,
},
"args": {
"df": df,
"label_column": label_column,
"sensitive_column": sensitive_column,
"categorical_columns": categorical_columns,
**(other_dataset_args or {}),
},
}
}

config = self._generate_config(
dataset_config=dataset_config,
methods=methods,
experiment_size=experiment_size,
use_baseline=use_baseline,
)

super().__init__(config=config)

@staticmethod
def _generate_config(
dataset_config: dict,
methods: Union[list[Literal["preprocessing", "inprocessing"]], Literal["all"]],
experiment_size: Literal["test", "small", "medium", "large"],
use_baseline: bool,
):
# Validate methods:
if methods == "all":
default_methods = [
Expand Down Expand Up @@ -73,10 +130,50 @@ def __init__(
"Invalid experiment_size value. Try one of "
f"{['test', 'small', 'medium', 'large']}."
)
# Update experiment config:
config = {
# Generate experiment config:
return {
"methods": method_configs,
"datasets": [dataset_config],
"optimization": experiment_config,
}
super().__init__(config=DictConfig(config))

@classmethod
def from_config(
cls,
dataset_config: Union[DictConfig, dict],
methods: Union[
list[Literal["preprocessing", "inprocessing"]], Literal["all"]
] = "all",
experiment_size: Literal["test", "small", "medium", "large"] = "small",
use_baseline: bool = True,
):
"""Create a DefaultExperiment from a pandas DataFrame.
Parameters
----------
dataset_config : Union[DictConfig, dict]
Dataset configuration.
methods : Union[list[Literal["preprocessing", "inprocessing"]], Literal["all"]], optional
Methods to include in the experiment. If "all", all methods will be included.
Defaults to "all".
experiment_size : Literal["test", "small", "medium", "large"], optional
Experiment size. Defaults to "small".
Returns
-------
DefaultExperiment
Default experiment.
Raises
------
ValueError
If the methods or experiment size are not valid.
"""
config = cls._generate_config(
dataset_config=dataset_config,
methods=methods,
experiment_size=experiment_size,
use_baseline=use_baseline,
)

return super().__init__(config=config)
30 changes: 25 additions & 5 deletions src/aequitas/flow/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hashlib
import json
import pickle
import pandas as pd
from pathlib import Path
from typing import Iterable, Optional, Tuple, Union

Expand Down Expand Up @@ -54,7 +55,7 @@ class Experiment:
def __init__(
self,
config_file: Optional[Path] = None,
config: Optional[DictConfig] = None,
config: Optional[Union[DictConfig, dict]] = None,
default_fields: Iterable[str] = ("methods", "datasets"),
save_artifacts: bool = True,
save_folder: Optional[Path] = Path("artifacts"),
Expand All @@ -65,9 +66,21 @@ def __init__(
self.logger = create_logger("Experiment")
self.logger.info("Instantiating Experiment class.")

self.dfs = {}
# Read config file
if config is not None:
self.config = config
if isinstance(config, DictConfig):
self.config = config
else:
# check if we have pandas dataframes passed as arguments
datasets = config["datasets"]
for dataset in datasets:
for name, conf in dataset.items():
if "args" in conf and "df" in conf["args"]:
self.dfs[name] = conf["args"]["df"]
conf["args"]["df"] = None
self.config = DictConfig(config)

elif config_file is not None:
self.config_reader = ConfigReader(
config_file, default_fields=default_fields
Expand Down Expand Up @@ -104,12 +117,19 @@ def _instantiate_sampler(self) -> BaseSampler:
return sampler(**self.config.optimization.sampler_args) # type: ignore

@staticmethod
def read_dataset(config: Union[dict, DictConfig]) -> Dataset:
def read_dataset(
config: Union[dict, DictConfig],
df: Optional[pd.DataFrame] = None,
) -> Dataset:
"""Read a dataset from a configuration object."""
if isinstance(config, dict):
config = DictConfig(config)
dataset_class = import_object(config.classpath)
dataset_object = dataset_class(**config.args) # type: ignore
# Casting args to dict, to add df if necessary
args = dict(config.args)
if df is not None:
args["df"] = df
dataset_object = dataset_class(**args) # type: ignore
return dataset_object

@staticmethod
Expand All @@ -128,7 +148,7 @@ def _read_datasets(self):
for dataset in self.config.datasets:
for name, configs in dataset.items(): # This iterates once.
self.logger.debug(f"Reading '{name}'. Configurations: {configs}.")
dataset_object = self.read_dataset(configs)
dataset_object = self.read_dataset(configs, self.dfs.get(name, None))
dataset_object.load_data()
dataset_object.create_splits()
self.logger.debug(f"Dataset {name} successfully read.")
Expand Down

0 comments on commit cdff72e

Please sign in to comment.