/
imbalance
executable file
·329 lines (274 loc) · 11.6 KB
/
imbalance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
#!/usr/bin/env python3
from __future__ import division
import numpy as np
from lmfit import Parameters, minimize
class Aniso(object):
""" Anisotropy detection channels """
def __init__(self, par, sen):
self.par = par
self.sen = sen
def map(self, f):
return Aniso(f(self.par), f(self.sen))
def estimate_period(y, period=1562.5, error=False):
"""
Estimate the period (measured in bins) from data or IRF.
:type y: Histogram
:type period:`float` or `int`
:param period: Initial estimate of period (in bins), must be close
:type error: `bool`
:param error: Whether to return estimate of error on period
"""
from scipy.interpolate import interp1d
from scipy.optimize import brentq
from scipy.misc import derivative
ts = np.arange(y.size)
y_int = interp1d(ts, y)
y_int_prime = lambda x0: derivative(y_int, x0, dx=1e-6)
# Must remove dt from each end of range to be able to take derivative
tps = np.linspace(1, y.size-2,1e4)
yps = y_int_prime(tps)
# Smoothing of the derivative is necessary if the statistics are not sufficient
# which can happen in the perpendicular channel
def smooth(x):
""" Smooth signal by convolution with `hamming` window of length 10. """
window_len = 10
x = np.r_[2*x[0]-x[window_len:1:-1], x, 2*x[-1]-x[-1:-window_len:-1]]
w = np.hamming(window_len)
x = np.convolve(w/w.sum(), x, mode='same')
return x[window_len-1:-window_len+1]
yps = smooth(yps)
t1 = np.median(tps[np.where(yps == yps.max())])
# Search around t1 for serveral measurements of the period. delta_t may
# need to be adjusted for different bin widths. Since the maximum of slope
# occurs near the peak we extend the range further below t1.
delta_t = 2
t1s = np.linspace(t1-2*delta_t, t1+delta_t, 10)
periods, t2s = [], []
for t1 in t1s:
t2 = t1-period if t1-period>0 else t1+period
for dt in np.linspace(0,2*delta_t):
a, b = t2-dt, t2+dt
if (y_int(a) < y_int(t1) and y_int(b) > y_int(t1)):
t2 = brentq(lambda t: y_int(t) - y_int(t1), a, b)
t2s.append(t2)
periods.append(np.abs(t2-t1))
break
else:
continue
t1s = np.array(t1s)
periods = np.array(periods)
debug = False
if debug:
print('Period: %.2f +/- %.2f bins' % (periods.mean(), periods.std()))
print('Period: %.2f +/- %.2f ps\n' % (periods.mean()*8, periods.std()*8))
y1s = y_int(t1s)
y2s = y_int(t2s)
pl.plot(y)
#pl.plot(tps,yps, '^', markersize=2)
pl.plot(t1s, y1s, 'x', t2s, y2s, 'o')
pl.yscale('log')
if error:
return periods.mean(), periods.std()
else:
return periods.mean()
def interpolate(signal, period=1562.5, offset=0):
"""
Interpolate a signal or histogram for convenient fitting of offset and relative
amplitudes.
"""
from scipy.interpolate import interp1d
periodic_signal_f = interp1d(np.arange(len(signal)), signal, assume_sorted=True)
ts = np.linspace(0,period, num=1e4) + offset
ts %= period
# sometimes numerical error gives us values slightly outside
# the interpolation range, be sure to clamp these
ts = np.clip(ts, a_min=0, a_max = len(signal) - 1)
return periodic_signal_f(ts)
class Fit(object):
def __init__(self, params):
"""
Provides global fitting of the detector imbalance and channel offsets
using the lmfit package. The model calculates g and offsets for each dataset
such that parallel channel = g * perpendicular channel (shifted by offset).
The fit will return the offsets measured in bins and the imbalance g is the
ratio of amplitudes of the parallel channel to the perpendicular channel.
:type params: lmfit `Parameters` object
:param params: Initially only containing parameter g, the detector imbalance
"""
self._anisos = []
self._offset_names = []
self._periods = []
self._params = params
def add_curve(self, aniso, offset_name):
"""
Add a dataset to be fit.
:type aniso: `Aniso` of histogram
:type offset_name: str
:param offset_name: Name of offset parameter in parameters object
"""
self._anisos.append(aniso)
period = aniso.map(estimate_period)
self._periods.append(period)
self._offset_names.append(offset_name)
print('Component %d par period: %.2f' % (len(self._anisos), period.par))
print('Component %d sen period: %.2f' % (len(self._anisos), period.sen))
def period(self):
""" Average period from all datasets and both channels """
period_sum = 0
for p in self._periods:
period_sum += p.par + p.sen
period = period_sum/len(self._periods)/2
return period
def unpack(self, params):
"""
Unpack values and curves from params object.
:rtype: tuple of (g, pars, sens, weights)
"""
g = params['g'].value
offsets = [params[name].value for name in self._offset_names]
pars, sens, weights = [], [], []
for aniso, period, offset in zip(self._anisos, self._periods, offsets):
par, sen = interpolate(aniso.par, period.par), interpolate(aniso.sen, period.sen, offset)
weight = 1./np.sqrt(par+sen)
pars.append(par)
sens.append(sen)
weights.append(weight)
return g, pars, sens, weights
def residual(self, params):
""" Calculate the residual of the model (par-g*sen)/np.sqrt(par+sen) """
g, pars, sens, weights = self.unpack(params)
resList = []
for par, sen, weight in zip(pars, sens, weights):
resList.append((par-g*sen)*weight)
return np.concatenate(resList)
def params(self):
return self._params
def print_redchis(self):
""" Print reduced chisquares of each dataset """
g, pars, sens, weights = self.unpack(self._params)
print('Reduced Chisquares:')
for idx, (par, sen, weight) in enumerate(zip(pars, sens, weights)):
redchi = np.sum( ((par-g*sen)*weight)**2 ) / (len(par)-2)
print('Component %d: %.2f' % (idx, redchi))
def print_gs(self):
"""
Print the value of g as calculated using the formula:
g = np.sum(par*sen/(par+sen))/np.sum(sen**2/(par+sen))
This formula is obtained by setting the derivative of the chisquare to 0
and solving for g.
"""
g, pars, sens, weights = self.unpack(self._params)
print('\nCalculation of g from equation using fitted offset:')
for idx, (par, sen) in enumerate(zip(pars, sens)):
g = np.sum(par*sen/(par+sen))/np.sum(sen**2/(par+sen))
print('Component %d: %.2f' % (idx, g))
def plot(self):
""" Plot the fit result. """
g, pars, sens, weights = self.unpack(self._params)
import matplotlib.pyplot as pl
fig, (ax0, ax1) = pl.subplots(2, sharex=True)
for idx, (par, sen) in enumerate(zip(pars, sens)):
ax0.plot(par, '.', ms=2, alpha=0.2, label='C%d par' % idx)
ax0.plot(g*sen, '.', ms=2, alpha=0.2, label='C%d g*sen' % idx)
ax1.plot((par-g*sen)/np.sqrt(par+sen), '.', ms=2, alpha=0.2)
ax0.set_yscale('log')
pl.xlim((0,len(par)))
fig.legend(ax0.lines, [line.get_label() for line in ax0.lines], 'right')
pl.show()
def fit(self):
"""
Fit the model after initializing and adding at least one curve with self.add_curve.
:rtype: lmfit `minimize` fit result
"""
print("Average period: %.2f" % self.period())
from lmfit import minimize, report_errors
result = minimize(self.residual, self._params, method='leastsq')
self.print_redchis()
print("Complete Fit: %.2f\n" % result.redchi)
report_errors(result.params)
self.print_gs()
self.plot()
return result
class Fit_Single_Offset(Fit):
def __init__(self, params):
"""
Provides global fitting of the detector imbalance and a single global channel
offset using the lmfit package. The model calculates g and an offset for all datasets
such that parallel channel = g * perpendicular channel (shifted by offset).
The fit will return the offset measured in bins and the imbalance g is the
ratio of amplitudes of the parallel channel to the perpendicular channel.
:type params: lmfit `Parameters` object
:param params: Needs to contain a parameter `g` for the detector imbalance
and a parameter `offset` for the offset between par and sen channels
"""
self._params = params
self._anisos = []
self._offset_names = []
self._periods = []
def add_curve(self, aniso):
"""
Add a dataset to be fit.
:type aniso: `Aniso` of histogram
:type offset_name: str
:param offset_name: Name of offset parameter in parameters object
"""
self._anisos.append(aniso)
period = aniso.map(estimate_period)
self._periods.append(period)
print('Component %d par period: %.2f' % (len(self._anisos), period.par))
print('Component %d sen period: %.2f' % (len(self._anisos), period.sen))
def unpack(self, params):
"""
Unpack values and curves from params object.
:rtype: tuple of (g, pars, sens, weights)
"""
g = params['g'].value
offset = params['offset'].value
period = self.period()
pars, sens, weights = [], [], []
for aniso in self._anisos:
par, sen = interpolate(aniso.par, period), interpolate(aniso.sen, period, offset)
weight = 1./np.sqrt(par+sen)
pars.append(par)
sens.append(sen)
weights.append(weight)
return g, pars, sens, weights
def fit(anisos, single_offset=False):
"""
:type anisos: [`Aniso`] of histograms
:params anisos: Fluorescence depolarization histograms
:params single_offset: Whether to fit a single offset or one offset per dataset
"""
params = Parameters()
params.add('g', 1)
if single_offset:
params.add('offset', 80)
fit = Fit_Single_Offset(params)
for aniso in anisos:
fit.add_curve(aniso)
return fit.fit()
else:
fit = Fit(params)
for pair_idx, aniso in enumerate(anisos):
offset_name = 'offset%d' % pair_idx
params.add(offset_name,80)
fit.add_curve(aniso, offset_name)
return fit.fit()
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('corr', metavar='FILE', nargs='+', type=argparse.FileType('r'),
help="correlation function")
parser.add_argument('--rep-rate', '-r', type=float,
help='pulse repetition rate (Hertz)')
parser.add_argument('--single-offset', '-so', action='store_true',
help='fit a single offset only')
args = parser.parse_args()
corrs = [Aniso(a,b) for a,b in zip(args.corr[::2], args.corr[1::2])]
for idx, corr in enumerate(corrs):
print('Component %d par: %s' % (idx, corr.par.name))
print('Component %d sen: %s' % (idx, corr.sen.name))
corrs = [aniso.map(lambda name: np.genfromtxt(name, names='time,counts')['counts']) for aniso in corrs]
result = fit(corrs, single_offset=args.single_offset)
if __name__=='__main__':
main()