-
Notifications
You must be signed in to change notification settings - Fork 2
/
factor_risk_parity.py
247 lines (197 loc) · 9.83 KB
/
factor_risk_parity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# Factor Risk Parity portfolio construction approach
import pandas as pd
import stock_data
import factor_data
import statsmodels.api as sm
import numpy as np
from scipy.optimize import minimize
from dateutil.relativedelta import relativedelta
from alive_progress import alive_bar
from alive_progress import config_handler
config_handler.set_global(force_tty=True)
def get_loading_matrix(stocks, factors):
""" Obtains factor to stocks loading matrix
:param stocks: DataFrame of all stocks considered
:param factors: DataFrame of all factors considered
:return: Matrix with factor to stocks loadings
"""
model = sm.OLS(stocks, factors).fit()
parameters = model.params
loading_matrix = pd.DataFrame(data=parameters.values, index=parameters.index,
columns=stocks.columns).T
return loading_matrix
def get_risk_contributions(asset_weights, loadings_matrix, Sigma):
""" Gets the risk contributions of risk factor to the portfolio
:param asset_weights: weight vector for all assets
:param loadings_matrix: loading matrix of factors to stocks
:param Sigma: covariance matrix of stocks
:return: Risk contributions of factors to portfolio
"""
# add size debug
x = asset_weights
# change Sigma based of results from OLS
vol_x = np.sqrt(np.matmul(np.matmul(x, Sigma), x))
Aplus = np.linalg.pinv(loadings_matrix)
AT_x = np.matmul(loadings_matrix.values.T, x)
Aplus_Sigma_x = np.matmul(np.matmul(Aplus, Sigma), x)
risk_contributions = (AT_x * Aplus_Sigma_x) / vol_x
return risk_contributions
def big_sigma(stock_returns):
"""Get Covariance matrix of stock returns
:param stock_returns: DataFrame of stock returns
:return: Covariance matrix
"""
sigma = stock_returns.cov().values
return sigma
def sigma_x(x, Sigma):
""" volatility of portfolio
:param x: weight vector of assets
:param Sigma: Covariance matrix of assets
:return: portfolio volatility (or standard deviation)
"""
vol_x = np.sqrt(np.matmul(np.matmul(x, Sigma), x))
return vol_x
def sigma_x_rc(x, loadings_matrix, sigma):
""" volatility of portfolio as sum of factor risk contributions
:param x: asset weight vector
:param loadings_matrix: factor to sotcks loading matrix
:param sigma: assets covariance matrix
:return: portfolio volatility
"""
vol_x = get_risk_contributions(x, loadings_matrix, sigma).sum()
return vol_x
def weights_factor_risk_parity(stocks, factor_structure, loadings_matrix, Sigma, x0):
""" Calculates assets weights according to the factor risk parity approach
:param stocks: DataFrame of stock returns
:param factor_structure: Structure of factor clusters (factors that share risk budgets)
:param loadings_matrix: Factor to stocks loading matrix
:param Sigma: stock returns covariance matrix
:param x0: asset weighs vector for initialization
:return: asset weights vector using factor risk parity method
"""
n_stocks = stocks.shape[1]
if x0 is None:
x0 = np.ones(n_stocks) * 1 / n_stocks
sigma = big_sigma(stocks)
def square(listt):
return [i ** 2 for i in listt]
fun = lambda x: sum(square(get_risk_contributions(x, loadings_matrix, Sigma) /
sigma_x(x, sigma) - 1 /
loadings_matrix.shape[1]))
# constrains
cons = [{'type': 'ineq', 'fun': lambda x: -sum(x) + 1},
{'type': 'ineq', 'fun': lambda x: sum(x) - 1},
{'type': 'ineq', 'fun': lambda x: np.matmul(loadings_matrix.values.T, x) - (-0.25)},
{'type': 'ineq', 'fun': lambda x: -np.matmul(loadings_matrix.values.T, x) + 1}
]
# bounds
bounds_short_lev = [(-1 / n_stocks, 1) for n in range(n_stocks)]
bounds_long = [(0, 1) for n in range(n_stocks)]
bounds = bounds_short_lev
res = minimize(fun, x0, method='SLSQP', bounds=bounds, constraints=cons, tol=1e-5, options={'disp': False})
print(res.fun)
return res.x
def portfolio_weights_factor_risk_parity(tickers, factor_tickers, start_date, end_date, portfolio_rebalance_period):
""" Applies factor risk parity over a period of time. Can be used for back testing
:param tickers: List of tickers of all candidate stocks to the portfolio
:param factor_tickers: List of tickers of factor used and respective cluster format
:param start_date: first date of the investment period
:param end_date: last date of the investment period
:param portfolio_rebalance_period: portfolio re-balancing period (monthly, weekly, etc.)
:return: DataFrame of asset weight vectors for each portfolio rebalancing date
"""
factor_structure = []
factor_tickers_flat = []
for group in factor_tickers:
if type(group) is list:
factor_structure.append(len(group))
for factor in group:
factor_tickers_flat.append(factor)
else:
factor_structure.append(1)
factor_tickers_flat.append(group)
business_days_end_months = pd.date_range(start_date, end_date, freq=portfolio_rebalance_period)
portfolio_weights = pd.DataFrame(index=business_days_end_months, columns=tickers)
x0 = None
with alive_bar(len(business_days_end_months)) as bar:
for t in business_days_end_months:
stocks = stock_data.get_daily_returns(tickers, t + relativedelta(months=-12), t)[1:]
factors = factor_data.get_factors(factor_tickers_flat, stocks.index[0], stocks.index[-1])
loadings_matrix = get_loading_matrix(stocks, factors)
sigma = big_sigma(stocks)
portfolio_weights.loc[t] = weights_factor_risk_parity_shared_rc(stocks, factor_structure, loadings_matrix,
sigma, x0)
x0 = portfolio_weights.loc[t]
print((get_risk_contributions(x0, loadings_matrix, sigma) / sigma_x_rc(x0, loadings_matrix, sigma)))
print(np.matmul(loadings_matrix.T, x0))
bar()
return portfolio_weights
def weights_factor_risk_parity_shared_rc(stocks, factor_structure, loadings_matrix, Sigma, x0):
n_stocks = stocks.shape[1]
if x0 is None:
x0 = np.ones(n_stocks) * 1 / n_stocks
# sigma = big_sigma(stocks)
def square(listt):
return [i ** 2 for i in listt]
def fun(x):
risk_contributions = get_risk_contributions(x, loadings_matrix, Sigma)
total_risk_contributions = sum(risk_contributions)
clusters_rcs = [part.sum() for part in np.split(risk_contributions, np.cumsum(factor_structure))[:-1]]
f = sum(square(clusters_rcs / total_risk_contributions - 1 / len(factor_structure)))
return f
# constrains
cons = [{'type': 'ineq', 'fun': lambda x: -sum(x) + 1},
{'type': 'ineq', 'fun': lambda x: sum(x) - 1},
{'type': 'ineq',
'fun': lambda x: np.array([part.sum() for part in np.split(np.matmul(loadings_matrix.values.T, x),
np.cumsum(factor_structure))[:-1]]) - (
-.25)},
{'type': 'ineq',
'fun': lambda x: -np.array([part.sum() for part in np.split(np.matmul(loadings_matrix.values.T, x),
np.cumsum(factor_structure))[:-1]])
+ 1},
{'type': 'ineq', 'fun': lambda x: np.matmul(loadings_matrix.values.T, x) - (-.25)},
{'type': 'ineq', 'fun': lambda x: -np.matmul(loadings_matrix.values.T, x) + 1},
{'type': 'ineq', 'fun': lambda x: get_risk_contributions(x, loadings_matrix, Sigma) - 0}
]
# avoid negative RC in the last equation for the shared RC case
# bounds
bounds_short_lev = [(-1 / n_stocks, 1) for n in range(n_stocks)]
bounds_long = [(0, 1) for n in range(n_stocks)]
bounds = bounds_short_lev
res = minimize(fun, x0, method='SLSQP', bounds=bounds, constraints=cons, tol=1e-5, options={'disp': False})
print(res.fun)
return res.x
def portfolio_weights_factor_risk_parity_intersection(tickers, factor_tickers, start_date, end_date,
portfolio_rebalance_period):
factor_structure = []
factor_tickers_flat = []
for group in factor_tickers:
if type(group) is list:
factor_structure.append(len(group))
for factor in group:
factor_tickers_flat.append(factor)
else:
factor_structure.append(1)
factor_tickers_flat.append(group)
business_days_end_months = pd.date_range(start_date, end_date, freq=portfolio_rebalance_period)
portfolio_weights = pd.DataFrame(index=business_days_end_months, columns=tickers)
x0 = None
with alive_bar(len(business_days_end_months)) as bar:
for t in business_days_end_months:
stocks = stock_data.get_daily_returns(tickers, t + relativedelta(months=-12), t)[1:]
factors = factor_data.get_factors(factor_tickers_flat, stocks.index[0], stocks.index[-1])
#factor intersection
f1 = factors['SMB']
f2 = factors['MOM']
f3 = factors['CMA'] * 0.5 + factors['HML_Devil'] * 0.5
f4 = factors['BaB'] * 0.5 + factors['RMW'] * 0.25 + factors['QMJ'] * 0.25
factors = pd.DataFrame([f1, f2, f3, f4]).T
loadings_matrix = get_loading_matrix(stocks, factors)
sigma = big_sigma(stocks)
portfolio_weights.loc[t] = weights_factor_risk_parity(stocks, factor_structure, loadings_matrix, sigma, x0)
x0 = portfolio_weights.loc[t]
print((get_risk_contributions(x0, loadings_matrix, sigma) / sigma_x_rc(x0, loadings_matrix, sigma)))
print(np.matmul(loadings_matrix.T, x0))
bar()
return portfolio_weights