/
entropy.py
88 lines (76 loc) · 3.44 KB
/
entropy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import numpy as np
from tqdm import tqdm
from .base import ValueFunction
import scipy.sparse
class Entropy(ValueFunction):
"""Entropy
The entropy of an item i is calculated using the relative frequency of
the possible ratings. In general, since entropy measures the spread of
ratings for an item, this strategy tends to promote rarely rated items,
which can be considerably informative.
References
----------
.. Mehdi Elahi, Francesco Ricci, and Neil Rubens. 2016. A survey of active learning
in collaborative filtering recommender systems. Computer Science Review 20 (2016), 29–50.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@staticmethod
def probabilities_entropy(probabilities):
try:
return -1 * np.sum(probabilities * np.log(probabilities))
except:
return 0
@staticmethod
def values_entropy(values):
_, counts = np.unique(values, return_counts=True)
values_probability = counts / np.sum(counts)
return Entropy.probabilities_entropy(values_probability)
@staticmethod
def get_items_entropy(consumption_matrix):
lowest_value = np.min(consumption_matrix)
items_entropy = np.zeros(consumption_matrix.shape[1])
is_spmatrix = isinstance(consumption_matrix, scipy.sparse.spmatrix)
if is_spmatrix:
consumption_matrix = scipy.sparse.csc_matrix(consumption_matrix)
for iid in range(consumption_matrix.shape[1]):
if is_spmatrix:
iid_ratings = consumption_matrix[:, iid].A.flatten()
iid_ratings = iid_ratings[iid_ratings > lowest_value]
else:
iid_ratings = consumption_matrix[:, iid]
iid_ratings = iid_ratings[iid_ratings > lowest_value]
items_entropy[iid] = Entropy.values_entropy(iid_ratings)
return items_entropy
def reset(self, observation):
train_dataset = observation
super().reset(train_dataset)
self.train_dataset = train_dataset
self.num_total_items = self.train_dataset.num_total_items
self.unique_values = self.train_dataset.rate_domain
self.num_unique_values = len(self.unique_values)
self.items_ratings = np.zeros(
(self.num_total_items, self.num_unique_values))
self.unique_values_ids = dict(
zip(self.unique_values, list(range(self.num_unique_values))))
self.items_num_total_ratings = np.zeros(self.num_total_items)
for uid, iid, reward, *rest in self.train_dataset.data:
self.items_ratings[int(iid), self.unique_values_ids[reward]] += 1
self.items_num_total_ratings[int(iid)] += 1
def actions_estimate(self, candidate_actions):
uid = candidate_actions[0]
candidate_items = candidate_actions[1]
items_score = [
self.probabilities_entropy(self.items_ratings[iid] /
np.sum(self.items_ratings[iid]))
if self.items_num_total_ratings[iid] > 0 else 0
for iid in candidate_items
]
return items_score, None
def update(self, observation, action, reward, info):
uid = action[0]
item = action[1]
additional_data = info
if reward in self.unique_values_ids:
self.items_ratings[item, self.unique_values_ids[reward]] += 1
self.items_num_total_ratings[item] += 1