Skip to content

brilliant return of good old datapreparator #121

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
338 changes: 280 additions & 58 deletions replay/data_preparator.py
Expand Up @@ -6,17 +6,20 @@
``ToNumericFeatureTransformer`` leaves only numerical features
by one-hot encoding of some features and deleting the others.
"""
import logging
import string
from typing import Dict, List, Optional

from pyspark.ml.feature import StringIndexerModel, IndexToString, StringIndexer
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf
from pyspark.sql.types import NumericType
from pyspark.sql.types import DoubleType, NumericType

from replay.constants import AnyDataFrame
from replay.session_handler import State
from replay.utils import convert2spark
from replay.utils import convert2spark, process_timestamp_column

LOG_COLUMNS = ["user_id", "item_id", "timestamp", "relevance"]


class Indexer: # pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -161,79 +164,298 @@ def _reindex(self, df: DataFrame, entity: str):


class DataPreparator:
"""
Convert pandas DataFrame to Spark, rename columns and apply indexer.
"""Transforms data to a library format:
- read as a spark dataframe/ convert pandas dataframe to spark
- check for nulls
- create relevance/timestamp columns if absent
- convert dates to TimestampType

Examples:

Loading log DataFrame

>>> import pandas as pd
>>> from replay.data_preparator import DataPreparator
>>>
>>> log = pd.DataFrame({"user": [2, 2, 2, 1],
... "item_id": [1, 2, 3, 3],
... "rel": [5, 5, 5, 5]}
... )
>>> dp = DataPreparator()
>>> correct_log = dp.transform(data=log,
... columns_mapping={"user_id": "user",
... "item_id": "item_id",
... "relevance": "rel"}
... )
>>> correct_log.show(2)
+-------+-------+---------+-------------------+
|user_id|item_id|relevance| timestamp|
+-------+-------+---------+-------------------+
| 2| 1| 5.0|2099-01-01 00:00:00|
| 2| 2| 5.0|2099-01-01 00:00:00|
+-------+-------+---------+-------------------+
only showing top 2 rows
<BLANKLINE>


Loading user features

>>> import pandas as pd
>>> from replay.data_preparator import DataPreparator
>>>
>>> log = pd.DataFrame({"user": ["user1", "user1", "user2"],
... "f0": ["feature1","feature2","feature1"],
... "f1": ["left","left","center"],
... "ts": ["2019-01-01","2019-01-01","2019-01-01"]}
... )
>>> dp = DataPreparator()
>>> correct_log = dp.transform(data=log,
... columns_mapping={"user_id": "user"},
... )
>>> correct_log.show(3)
+-------+--------+------+----------+
|user_id| f0| f1| ts|
+-------+--------+------+----------+
| user1|feature1| left|2019-01-01|
| user1|feature2| left|2019-01-01|
| user2|feature1|center|2019-01-01|
+-------+--------+------+----------+
<BLANKLINE>

"""

def __init__(self):
self.indexer = Indexer()
_logger: Optional[logging.Logger] = None

def __call__(
self,
log: AnyDataFrame,
user_features: Optional[AnyDataFrame] = None,
item_features: Optional[AnyDataFrame] = None,
mapping: Optional[Dict] = None,
) -> tuple:
"""
Convert ids into idxs for provided DataFrames

:param log: historical log of interactions
``[user_id, item_id, timestamp, relevance]``
:param user_features: user features (must have ``user_id``)
:param item_features: item features (must have ``item_id``)
:param mapping: dictionary mapping "default column name:
column name in input DataFrame"
``user_id`` and ``item_id`` mappings are required,
``timestamp`` and``relevance`` are optional.
:return: three converted DataFrames
"""
log, user_features, item_features = [
convert2spark(df) for df in [log, user_features, item_features]
]
log, user_features, item_features = [
self._rename(df, mapping)
for df in [log, user_features, item_features]
]
if user_features:
users = log.select("user_id").union(
user_features.select("user_id")
)
else:
users = log.select("user_id")
if item_features:
items = log.select("item_id").union(
item_features.select("item_id")
)
@property
def logger(self) -> logging.Logger:
"""
:returns: get library logger
"""
if self._logger is None:
self._logger = logging.getLogger("replay")
return self._logger

@staticmethod
def read_as_spark_df(
data: Optional[AnyDataFrame] = None,
path: str = None,
format_type: str = None,
**kwargs,
) -> DataFrame:
"""
Read spark dataframe from file of transform pandas dataframe.

:param data: DataFrame to process (``pass`` or ``data`` should be defined)
:param path: path to data (``pass`` or ``data`` should be defined)
:param format_type: file type, one of ``[csv , parquet , json , table]``
:param kwargs: extra arguments passed to
``spark.read.<format>(path, **reader_kwargs)``
:return: spark DataFrame
"""
if data is not None:
dataframe = convert2spark(data)
elif path and format_type:
spark = State().session
if format_type == "csv":
dataframe = spark.read.csv(path, inferSchema=True, **kwargs)
elif format_type == "parquet":
dataframe = spark.read.parquet(path)
elif format_type == "json":
dataframe = spark.read.json(path, **kwargs)
elif format_type == "table":
dataframe = spark.read.table(path)
else:
raise ValueError(
f"Invalid value of format_type='{format_type}'"
)
else:
items = log.select("item_id")
self.indexer.fit(users, items)
raise ValueError("Either data or path parameters must not be None")
return dataframe

log = self.indexer.transform(log)
if user_features:
user_features = self.indexer.transform(user_features)
if item_features:
item_features = self.indexer.transform(item_features)
def check_df(
self, dataframe: DataFrame, columns_mapping: Dict[str, str]
) -> None:
"""
Check:
- if dataframe is not empty,
- if columns from ``columns_mapping`` are present in dataframe
- warn about nulls in columns from ``columns_mapping``
- warn about absent of ``timestamp/relevance`` columns for interactions log
- warn about wrong relevance DataType

:param dataframe: spark DataFrame to process
:param columns_mapping: dictionary mapping "key: column name in input DataFrame".
Possible keys: ``[user_id, user_id, timestamp, relevance]``
``columns_mapping`` values specifies the nature of the DataFrame:
- if both ``[user_id, item_id]`` are present,
then the dataframe is a log of interactions.
Specify ``timestamp, relevance`` columns in mapping if available.
- if ether ``user_id`` or ``item_id`` is present,
then the dataframe is a dataframe of user/item features
"""
if not dataframe.head(1):
raise ValueError("DataFrame is empty")

return log, user_features, item_features
for value in columns_mapping.values():
if value not in dataframe.columns:
raise ValueError(
f"Column `{value}` stated in mapping is absent in dataframe"
)

for column in columns_mapping.values():
if dataframe.where(sf.col(column).isNull()).count() > 0:
self.logger.info(
"Column `%s` has NULL values. Handle NULL values before "
"the next data preprocessing/model training steps",
column,
)

if (
"user_id" in columns_mapping.keys()
and "item_id" in columns_mapping.keys()
):
absent_cols = set(LOG_COLUMNS).difference(columns_mapping.keys())
if len(absent_cols) > 0:
self.logger.info(
"Columns %s are absent, but may be required for models training. "
"Add them with DataPreparator().generate_absent_log_cols",
list(absent_cols),
)
if "relevance" in columns_mapping.keys():
if not isinstance(
dataframe.schema[columns_mapping["relevance"]].dataType,
NumericType,
):
self.logger.info(
"Relevance column `%s` should be numeric, " "but it is %s",
columns_mapping["relevance"],
dataframe.schema[columns_mapping["relevance"]].dataType,
)

@staticmethod
def _rename(df: DataFrame, mapping: Dict) -> DataFrame:
def add_absent_log_cols(
dataframe: DataFrame,
columns_mapping: Dict[str, str],
default_relevance: float = 1.0,
default_ts: str = "2099-01-01",
):
"""
Add ``relevance`` and ``timestamp`` columns with default values if
``relevance`` or ``timestamp`` is absent among mapping keys.

:param dataframe: interactions log to process
:param columns_mapping: dictionary mapping "key: column name in input DataFrame".
Possible keys: ``[user_id, user_id, timestamp, relevance]``
:param default_relevance: default value for generated `relevance` column
:param default_ts: str, default value for generated `timestamp` column
:return: spark DataFrame with generated ``timestamp`` and ``relevance`` columns
if absent in original dataframe
"""
absent_cols = set(LOG_COLUMNS).difference(columns_mapping.keys())
if "relevance" in absent_cols:
dataframe = dataframe.withColumn(
"relevance", sf.lit(default_relevance).cast(DoubleType())
)
if "timestamp" in absent_cols:
dataframe = dataframe.withColumn(
"timestamp", sf.to_timestamp(sf.lit(default_ts))
)
return dataframe

@staticmethod
def _rename(df: DataFrame, mapping: Dict) -> Optional[DataFrame]:
"""
rename dataframe columns based on mapping
"""
if df is None or mapping is None:
return df
for out_col, in_col in mapping.items():
if in_col in df.columns:
df = df.withColumnRenamed(in_col, out_col)
return df

def back(self, df: DataFrame) -> DataFrame:
# pylint: disable=too-many-arguments
def transform(
self,
columns_mapping: Dict[str, str],
data: Optional[AnyDataFrame] = None,
path: Optional[str] = None,
format_type: Optional[str] = None,
date_format: Optional[str] = None,
reader_kwargs: Optional[Dict] = None,
) -> DataFrame:
"""
Convert DataFrame to the initial indexes.

:param df: DataFrame with idxs
:return: DataFrame with ids
Transforms log, user or item features into a Spark DataFrame
``[user_id, user_id, timestamp, relevance]``,
``[user_id, *features]``, or ``[item_id, *features]``.
Input is either file of ``format_type``
at ``path``, or ``pandas.DataFrame`` or ``spark.DataFrame``.
Transform performs:
- dataframe reading/convert to spark DataFrame format
- check dataframe (nulls, columns_mapping)
- rename columns from mapping to standard names (user_id, user_id, timestamp, relevance)
- for interactions log: create absent columns,
convert ``timestamp`` column to TimestampType and ``relevance`` to DoubleType

:param columns_mapping: dictionary mapping "key: column name in input DataFrame".
Possible keys: ``[user_id, user_id, timestamp, relevance]``
``columns_mapping`` values specifies the nature of the DataFrame:
- if both ``[user_id, item_id]`` are present,
then the dataframe is a log of interactions.
Specify ``timestamp, relevance`` columns in mapping if present.
- if ether ``user_id`` or ``item_id`` is present,
then the dataframe is a dataframe of user/item features

:param data: DataFrame to process
:param path: path to data
:param format_type: file type, one of ``[csv , parquet , json , table]``
:param date_format: format for the ``timestamp`` column
:param reader_kwargs: extra arguments passed to
``spark.read.<format>(path, **reader_kwargs)``
:return: processed DataFrame
"""
return self.indexer.inverse_transform(df)
is_log = False
if (
"user_id" in columns_mapping.keys()
and "item_id" in columns_mapping.keys()
):
self.logger.info(
"Columns with ids of users or items are present in mapping. "
"The dataframe will be treated as an interactions log."
)
is_log = True
elif (
"user_id" not in columns_mapping.keys()
and "item_id" not in columns_mapping.keys()
):
raise ValueError(
"Mapping either for user ids or for item ids is not stated in `columns_mapping`"
)
else:
self.logger.info(
"Column with ids of users or items is absent in mapping. "
"The dataframe will be treated as a users'/items' features dataframe."
)
reader_kwargs = {} if reader_kwargs is None else reader_kwargs
dataframe = self.read_as_spark_df(
data=data, path=path, format_type=format_type, **reader_kwargs
)
self.check_df(dataframe, columns_mapping=columns_mapping)
dataframe = self._rename(df=dataframe, mapping=columns_mapping)
if is_log:
dataframe = self.add_absent_log_cols(
dataframe=dataframe, columns_mapping=columns_mapping
)
dataframe = dataframe.withColumn(
"relevance", sf.col("relevance").cast(DoubleType())
)
dataframe = process_timestamp_column(
dataframe=dataframe,
column_name="timestamp",
date_format=date_format,
)

return dataframe


class CatFeaturesTransformer:
Expand Down