/
BTS.py
1584 lines (1175 loc) · 46.4 KB
/
BTS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import numpy
import matplotlib.pyplot
from scipy.optimize import curve_fit
from astropy.convolution import convolve, Gaussian1DKernel, convolve_fft
import astropy.io.fits
import os
############ Fitting routine itself
def fit_single_line(vel,x,mask,params):
####### Unpack the parameter array
debug = params["debug"]
smooth = params["smoothing_length"]
var_noise = params["variable_noise"]
noise_level = params["noise_level"]
n = params["signal_to_noise_ratio"]
max_peaks = params["max_peaks"]
max_it = params["max_iterations"]
daic = params["delta_AIC_limit"]
min_num_channels = params["min_velocity_channels"]
min_vw = params["min_width_value"]
max_vw = params["max_width_value"]
mask_pad = params["mask_pad"]
### If the spectrum mask shows fewer than the minimum number of velocity channels of emission then skip spectrum fitting
if(sum(mask)<min_num_channels):
return [[-2,0,0],[0,0,0],1e9]
if(debug==1):
print( "##########")
print( "DEBUG MODE")
print( "##########")
print( " ")
### Get info about velocity
nv = len(x)
maxv=max(vel)
minv=min(vel)
dv = numpy.fabs(vel[1] - vel[0])
####### Determine the noise level
if(var_noise == 1):
noise = numpy.std(x[mask==0])
else:
noise = noise_level
### Pad the mask so that a some noise channels on either side of the emission channels are still included in all the calculations
mask = pad_mask(mask,mask_pad)
#### prepare the data and convolve spectrum with Gaussian for peak determination.
spec = x[:]
gk = Gaussian1DKernel(smooth)
smo_spec = convolve(spec,gk)
### Work out the gradients of the spectrum
dspec = numpy.zeros_like(spec)
for ii in range(0,nv-1):
dspec[ii] = (smo_spec[ii+1]-smo_spec[ii])/dv
ddspec = numpy.zeros_like(spec)
for ii in range(0,nv-2):
ddspec[ii] = (dspec[ii+1]-dspec[ii])/dv
dddspec = numpy.zeros_like(spec)
for ii in range(0,nv-3):
dddspec[ii] = (ddspec[ii+1]-ddspec[ii])/dv
switch = numpy.zeros_like(dspec)
decrease = 0
### go through and work out the number of peaks
for ii in range(0,nv-2):
if(ddspec[ii+1] < ddspec[ii] and ddspec[ii+2] < ddspec[ii]):
decrease = 1
if(ddspec[ii+1] > ddspec[ii] and ddspec[ii+2] > ddspec[ii] and dddspec[ii]>0):
if(decrease==1 and (mask[ii]==1 or mask[ii+1]==1 or mask[ii-1]==1) and (spec[ii]> n*noise or spec[ii+1]> n*noise or spec[ii-1]> n*noise) ):
switch[ii] = 1
decrease = 0
### if there seems to be no peaks, something went wrong so just make a guess of a single component
if(sum(switch)<1):
guess = single_gauss_guess(vel,spec,mask)
### Maybe a bad pixel/too few emission channels even after padding so return non-convergence tag
if(numpy.isnan(guess[2]) or guess[0]<n*noise):
return [[-3,0,0],[0,0,0],1e9]
### If guess is ok then proceed
n_peaks = 1
pid=1
bound = (numpy.zeros(3*n_peaks),numpy.zeros(3*n_peaks))
bound[0][0] = n*noise
bound[1][0] = 2*max(spec) + n*noise
bound[0][1] = minv
bound[1][1] = maxv
bound[0][2] = min_vw
bound[1][2] = max_vw
if(guess[0]<n*noise):
guess[0]=1.01*n*noise
if(guess[1]>maxv):
guess[1]=maxv-dv
if(guess[1]<minv):
guess[1]=minv+dv
if(guess[2]>max_vw):
guess[2] = 0.99*max_vw
if(guess[2]<min_vw):
guess[2]=1.01*min_vw
else:
### here we set up the arrays that contain the guesses for the peaks' amplitudes and centriods and widths
index = numpy.linspace(0,nv-1,nv)
index = numpy.array(index,dtype=numpy.int)
pid = index[switch==1] + 1
pcent = vel[pid]
pamp = spec[pid]
pcent = numpy.array(pcent,dtype=numpy.double)
pamp = numpy.array(pamp,dtype=numpy.double)
high = numpy.zeros_like(spec)
high = numpy.array(high,dtype=numpy.int)
high[spec>0.5*min(pamp)] = numpy.int(1)
start=-1
finish=-1
psig = numpy.zeros(len(pid))
num=0
### Go through and work out a guess for the widths
for ii in range(0,nv-1):
if(ii==0 and high[0]==1):
start=ii
if(high[ii+1] == 1 and high[ii] == 0 and start==-1 and finish==-1):
start=ii
if(high[ii+1] == 0 and high[ii] == 1 and start!=-1):
finish = ii
ran = numpy.fabs(vel[finish] -vel[start])
l = len(pid[(pid<finish)*(pid>start)])
for jj in range(num,num+l):
psig[jj] = ran/(l*numpy.sqrt(8*numpy.log(2)))
num=num+l
if(debug==1):
print( "start = ", vel[start])
print( "finish = ", vel[finish])
start=-1
finish=-1
### if a guess width is smaller than the velocity resolution then we set it to the velocity resoution
psig[psig<min_vw] = 1.01 * min_vw
### If more peaks are guessed than maximum allowed peaks then keep on the highest amplitude peaks up to the max_peak number
n_peaks = len(pamp)
if(n_peaks>max_peaks):
if(debug==1):
print("More than", max_peaks,"peaks were found")
dum = numpy.argsort(pamp)
pamp = pamp[dum[-max_peaks:]]
psig = psig[dum[-max_peaks:]]
pcent = pcent[dum[-max_peaks:]]
pid = pid[dum[-max_peaks:]]
n_peaks=max_peaks
### set limits on the guess and fill the guess and boundary arrays
guess = numpy.zeros(3*n_peaks)
bound = (numpy.zeros(3*n_peaks),numpy.zeros(3*n_peaks))
guess = numpy.array(guess,dtype=numpy.double)
for ii in range(0,n_peaks):
if(pamp[ii]<n*noise):
pamp[ii]=1.01*n*noise
if(pcent[ii]>maxv):
pcent[ii]=maxv-dv
if(pcent[ii]<minv):
pcent[ii]=minv+dv
if(psig[ii]>max_vw):
psig[ii] = 0.99*max_vw
if(psig[ii]<min_vw):
psig[ii]=1.01*min_vw
guess[3*ii] = pamp[ii]
guess[3*ii+1] = pcent[ii]
guess[3*ii+2] = psig[ii]
bound[0][3*ii] = n*noise
bound[1][3*ii] = 2*max(spec) + n*noise
bound[0][3*ii+1] = minv
bound[1][3*ii+1] = maxv
bound[0][3*ii+2] = min_vw
bound[1][3*ii+2] = max_vw
if(debug==1):
print( "######## Guess values #########")
print( "Number of peaks = ", n_peaks)
print( "Peak ids = ", pid)
print( "Peak centroids = ", guess[1::3])
print( "Peak amplitude = ", guess[::3])
print( "Peak width = ", guess[2::3])
print( " ")
### Fit for the first time using the initial guesses
nit=0
co_eff, errors, converged, model, AIC = fit_guess(vel,spec,mask,guess,bound,noise)
keep_going=1
### Debug display information and if no convergence is reached then try a single component guess and fit again
if(debug==1):
print("##### After first fit #####")
print("Co_eff = ", co_eff)
print("AIC = ", AIC)
print(" ")
### No convergence is found after the first fit, try to simplify and try again with a single component
if(converged!=1):
if(debug==1):
print("First fit did not converge")
print("Guess parameters = ",guess)
print("Noise = %.3lf, velocity channel = %.3lf" %(noise, dv))
guess = single_gauss_guess(vel,spec,mask)
### Maybe a bad pixel/too few emission channels even after padding so return non-convergence tag
if(numpy.isnan(guess[2]) or guess[0]<n*noise):
return [[-3,0,0],[0,0,0],1e9]
### Else proceed
n_peaks = 1
bound = (numpy.zeros(3*n_peaks),numpy.zeros(3*n_peaks))
bound[0][0] = n*noise
bound[1][0] = 2*max(spec) + n*noise
bound[0][1] = minv
bound[1][1] = maxv
bound[0][2] = min_vw
bound[1][2] = max_vw
if(guess[0]<n*noise):
guess[0]=1.05*n*noise
if(guess[1]>maxv):
guess[1]=maxv-dv
if(guess[1]<minv):
guess[1]=minv+dv
if(guess[2]>max_vw):
guess[2] = 0.99*max_vw
if(guess[2]<min_vw):
guess[2]=1.05*min_vw
### Try a fit with this new guess
co_eff, errors, converged, model, AIC = fit_guess(vel,spec,mask,guess,bound,noise)
### If still not converging then exit for this spectrum
if(converged!=1):
print("Fit did not converge")
return [[-1,0,0],[0,0,0],1e9]
### Until the AIC minimum is found or a sufficient number of iterations are performed keep adding and removing components to find optimal solution
while(keep_going==1 and nit<max_it and converged==1):
### Reset the AICs and co_effs
AIC_m = 1e9
AIC_l = 1e9
co_eff_m = co_eff
co_eff_l = co_eff
check=0
#### Check for boundary components
check = check_for_boundary(co_eff,bound)
if(check==1):
### If there is a boundary component, remove it and get new guesses without it
guess_new, bound_new = remove_boundary_components(co_eff,bound)
### If the new guess was reasonable fit using it
if(guess_new[0]>0):
co_eff_n,errors_n,converged_n,model_n,AIC_n = fit_guess(vel,spec,mask,guess_new,bound_new,noise)
### If the fit using the new guess without the boundary component is converged we take it
if(converged_n==1):
n_peaks = int(len(co_eff_n[::3]))
co_eff = co_eff_n
errors = errors_n
converged = converged_n
model = model_n
AIC = AIC_n
guess = guess_new
bound = bound_new
### Add an additional component if the maximum number of peaks hasn't been reached and fit again
if(n_peaks<max_peaks):
guess_m, bound_m = add_component(vel,spec,mask,model,co_eff,bound)
co_eff_m, errors_m, converged_m, model_m, AIC_m = fit_guess(vel,spec,mask,guess_m,bound_m,noise)
else:
AIC_m = 1e9
### If there are currently more than 1 peaks, remove one of the peaks and fit again
if(n_peaks>1):
guess_l, bound_l = remove_component(vel,spec,mask,model,co_eff,bound)
co_eff_l, errors_l, converged_l, model_l, AIC_l = fit_guess(vel,spec,mask,guess_l,bound_l,noise)
else:
AIC_l = 1e9
### Debug info
if(debug==1):
print("##### After %d fit #####" %(nit+1))
print("Keep going check = %d, Number of peaks = %d" %(keep_going, n_peaks))
print("AIC of npeaks = %.3lf, AIC of npeaks-1 = %.3lf, AIC of npeaks+1 = %.3lf" %(AIC,AIC_l,AIC_m))
print("Co_eff of npeaks:", co_eff)
print("Co_eff of npeaks-1:", co_eff_l)
print("Co_eff of npeaks+1:", co_eff_m)
print(" ")
### Current fit has a better AIC than both the added and removed component fits, thus we stop the iterations
if((AIC + daic < AIC_l and AIC <= AIC_m + daic)):
keep_going=0
### Removing a component leads to better fit than both the added component and current fits. Repeat again with this new fit
if(AIC_l <= AIC + daic):
co_eff = co_eff_l
errors = errors_l
converged = converged_l
model = model_l
AIC = AIC_l
guess = guess_l
bound = bound_l
n_peaks = n_peaks - 1
keep_going = 1
### Adding a component leads to a better fit than both the current and removed component fits. Repeat again with this new fit
if(AIC_m + daic < AIC and AIC_l>AIC+daic):
co_eff = co_eff_m
errors = errors_m
converged = converged_m
model = model_m
AIC = AIC_m
guess = guess_m
bound = bound_m
n_peaks = n_peaks + 1
keep_going = 1
### Add an iteration to the counter
nit = nit+1
### If there is no convergence at any point return error
if(converged!=1):
print("Fit did not converge")
return [[-1,0,0],[0,0,0],1e9]
### If the number of iterations is too high then return best fit so far and inform user
if(nit==max_it):
#print("Took lots of iterations")
return co_eff, errors, AIC
return co_eff, errors, AIC
### Function to pad the mask around the emission channels
def pad_mask(mask,mask_pad):
nv = len(mask)
ii = mask_pad
while(ii<nv-mask_pad):
if(mask[ii]==1 and mask[ii-1]==0):
mask[ii-mask_pad:ii] = 1
ii = ii + 1
continue
elif(mask[ii]==0 and mask[ii-1]==1):
mask[ii:ii+mask_pad] = 1
ii = ii + mask_pad + 1
continue
else:
ii = ii + 1
return mask
### A function to determine a single component guess to help poor convergence spectra
def single_gauss_guess(vel,spec,mask):
# Consider only parts of the spectra masked as emission
vv = vel[mask==1]
ss = spec[mask==1]
# Discard channels with negative intensity values to avoid biasing moment estimates
vv = vv[ss>0]
ss = ss[ss>0]
# Calculate moments 1 and 2 and use them as estimates of the centroid and width of the single component guess
mom1, mom2 = weighted_avg_and_std(vv,ss)
guess = numpy.zeros(3,dtype=numpy.double)
dv = vel[1]-vel[0]
mom1_index = int((mom1 - vel[0])/dv) + 1
guess[0] = spec[mom1_index]
guess[1] = mom1
guess[2] = mom2
return guess
### Function to return N superimposed Gaussian functions
def multi_gauss(vel,params):
N = int(len(params)/3)
a,b,c = numpy.array(params[::3]),numpy.array(params[1::3]),numpy.array(params[2::3])
y = numpy.zeros_like(vel)
for ii in range(0,N):
y = y + a[ii]*numpy.exp(-(vel-b[ii])**2 / (2*c[ii]**2))
return y
### Function to fit a N Gaussian function
def fit_guess(vel,spec,mask,guess,bound,noise):
try:
co_eff, var_matrix = curve_fit(lambda vel, *guess: multi_gauss(vel,guess),vel,spec,p0=guess,method="trf",bounds=bound)
errors = numpy.sqrt(numpy.diag(var_matrix))
converged = 1
model = multi_gauss(vel,co_eff)
AIC = calc_AIC(vel,spec,co_eff,model,mask,noise)
if(AIC==1e9):
converged=0
except RuntimeError:
co_eff = numpy.zeros_like(guess)
errors = numpy.zeros_like(guess)
converged=0
model = numpy.zeros_like(vel)
AIC = 1e9
return co_eff, errors, converged, model, AIC
### Calculate the corrected AIC using only the velocity channels with emission
def calc_AIC(vel,spec,co_eff,model,mask,noise):
res = spec - model
res = res[mask==1]
### Number of data points equal to number of velocity channels used to calculate residuals
### Number of parameters of the model is equal to the number of parameters for the fitting
n = len(res)
k = int(len(co_eff))
### If there are too few data points to calculate the correction factor then return extremely high AIC
if(n-k-1 < 1):
AIC=1e9
else:
AIC = 2*k + n*numpy.log(2*numpy.pi) + n*numpy.log(noise**2) + (sum(res**2))/(noise**2) + (2*k**2 + 2*k)/(n-k-1)
return AIC
### Add a component by finding the location of the maximum residual
def add_component(vel,spec,mask,model,guess,bound):
res = spec - model
res[mask==0] = 0
### Expand bound array first
n_peaks = int(len(guess[::3]))
bound2 = (numpy.zeros(3*n_peaks + 3),numpy.zeros(3*n_peaks + 3))
for ii in range(0,n_peaks+1):
bound2[0][3*ii] = bound[0][0]
bound2[1][3*ii] = bound[1][0]
bound2[0][3*ii+1] = bound[0][1]
bound2[1][3*ii+1] = bound[1][1]
bound2[0][3*ii+2] = bound[0][2]
bound2[1][3*ii+2] = bound[1][2]
### Now do guess
guess2 = numpy.zeros(len(guess) + 3)
guess2[:3*n_peaks] = guess[:3*n_peaks]
### Find place with highest residual
guess2[3*n_peaks] = numpy.amax(res)
high_res = numpy.argmax(res)
guess2[3*n_peaks+1] = vel[high_res]
guess2[3*n_peaks+2] = 4.01*bound[0][2]
### Do some double checks with the guessed parameters to ensure they lie within the bounds.
if(guess2[3*n_peaks]<bound2[0][0]):
guess2[3*n_peaks] = 1.5 * bound2[0][0]
if(guess2[3*n_peaks]>bound2[1][0]):
guess2[3*n_peaks] = 0.9 * bound2[1][0]
return guess2,bound2
### Remove a component by removing the smallest amplitude component
def remove_component(vel,spec,mask,model,guess,bound):
### Reduce bound array first
n_peaks = int(len(guess[::3]))
bound2 = (numpy.zeros(3*n_peaks - 3),numpy.zeros(3*n_peaks - 3))
for ii in range(0,n_peaks-1):
bound2[0][3*ii] = bound[0][0]
bound2[1][3*ii] = bound[1][0]
bound2[0][3*ii+1] = bound[0][1]
bound2[1][3*ii+1] = bound[1][1]
bound2[0][3*ii+2] = bound[0][2]
bound2[1][3*ii+2] = bound[1][2]
### Extract the three parameters of the Gaussian fits
pamp = guess[::3]
pcent = guess[1::3]
psig = guess[2::3]
### Find the two components which are closest together and pick the lower amplitude one.
diff_v = 1e9
min_diff_v = 0
for ii in range(0,n_peaks):
for jj in range(ii+1,n_peaks):
ddv = numpy.fabs(pcent[ii] - pcent[jj])
if(ddv<diff_v):
diff_v = ddv
if(pamp[ii]<pamp[jj]):
min_diff_v=ii
else:
min_diff_v=jj
guess2 = numpy.zeros(len(guess) - 3)
ind = numpy.arange(0,n_peaks,1)
guess2[::3] = pamp[ind!=min_diff_v]
guess2[1::3] = pcent[ind!=min_diff_v]
guess2[2::3] = psig[ind!=min_diff_v]
return guess2,bound2
### Check if the co-efficients are too close to the minimum amplitude or width boundaries. Typical of bad fits
def check_for_boundary(co_eff,bound):
q = numpy.sum(co_eff[::3] < 1.02*bound[0][0]) + numpy.sum(co_eff[2::3] < 1.02*bound[0][2])
if(q>0):
check_bound=1
else:
check_bound=0
return check_bound
### Remove the components which are close to the minimum amplitude or width boundaries
def remove_boundary_components(co_eff,bound):
#### First check the amplitudes
q = numpy.sum(co_eff[::3] < 1.02*bound[0][0])
### Check is every component has a tiny amplitude
if(q==len(co_eff[::3])):
### If there is only one component we can't remove it so return it
if(q==1):
return co_eff, bound
### If there is more than one component then remove all but the first and try again
if(q>1):
return co_eff[:3], [bound[0][:3],bound[1][:3]]
### There are components with tiny amplitudes, but not all
if(q>0):
### Make new bounds array
n_peaks = len(co_eff[::3]) - q
bound2 = (numpy.zeros(3*n_peaks),numpy.zeros(3*n_peaks))
for ii in range(0,n_peaks):
bound2[0][3*ii] = bound[0][0]
bound2[1][3*ii] = bound[1][0]
bound2[0][3*ii+1] = bound[0][1]
bound2[1][3*ii+1] = bound[1][1]
bound2[0][3*ii+2] = bound[0][2]
bound2[1][3*ii+2] = bound[1][2]
### Copy over the components which have amplitudes above the minimum
guess2 = numpy.zeros(3*n_peaks)
p = co_eff[::3] > 1.02*bound[0][0]
guess2[::3] = co_eff[::3][p]
guess2[1::3] = co_eff[1::3][p]
guess2[2::3] = co_eff[2::3][p]
co_eff = guess2
bound=bound2
### Now check the widths
q = numpy.sum(co_eff[2::3]<1.02*bound[0][2])
### Check is every component has a tiny width
if(q==len(co_eff[::3])):
### If there is only one component we can't remove it so return it
if(q==1):
return co_eff, bound
### If there is more than one component then remove all but the first and try again
if(q>1):
return co_eff[:3] , [bound[0][:3],bound[1][:3]]
### There are components with tiny amplitudes, but not all
if(q>0):
### Make new bounds array
n_peaks = len(co_eff[::3]) - q
bound2 = (numpy.zeros(3*n_peaks),numpy.zeros(3*n_peaks))
for ii in range(0,n_peaks):
bound2[0][3*ii] = bound[0][0]
bound2[1][3*ii] = bound[1][0]
bound2[0][3*ii+1] = bound[0][1]
bound2[1][3*ii+1] = bound[1][1]
bound2[0][3*ii+2] = bound[0][2]
bound2[1][3*ii+2] = bound[1][2]
### Copy over the components which have amplitudes above the minimum
guess2 = numpy.zeros(3*n_peaks)
p = co_eff[2::3] > 1.02*bound[0][2]
guess2[::3] = co_eff[::3][p]
guess2[1::3] = co_eff[1::3][p]
guess2[2::3] = co_eff[2::3][p]
co_eff = guess2
bound=bound2
return co_eff,bound
##### Test run functions, used to help determine the optimal values for the 3 important parameters: chi_limit, smoothing_length and signal to noise ratio
### The first is for the single Gaussian test, used to determine the average error on the fitting parameters
def single_gaussian_test(param):
### Unpack the parameter array
num_test = param["test_number"]
spec_min = param["test_spec_min"]
spec_max = param["test_spec_max"]
spec_nv = param["test_spec_num"]
noise_level = param["test_noise"]
amp_min = param["test_amplitude_min"]
amp_max = param["test_amplitude_max"]
cen_min = param["test_vel_cen_min"]
cen_max = param["test_vel_cen_max"]
wid_min = param["test_width_min"]
wid_max = param["test_width_max"]
plot_tag = param["test_plot_tag"]
var_noise = param["variable_noise"]
if(var_noise!=0):
print("variable_noise should be set to 0 for this test. Setting it to 0 now.")
param["variable_noise"] = 0
### Construct the velocity array
v = numpy.linspace(spec_min,spec_max,spec_nv)
### Initiize the seed for the random number generators
numpy.random.seed(4)
### Counters
n=0
n2=0
### Empty lists to store the errors on the fitting parameters
a_e = []
c_e = []
w_e = []
c_chi = []
### Loop over the number of tests specified
print( "#######################")
print( "#### Test progress ####")
print( "#######################")
print( " ")
for ii in range(0,num_test):
if((ii+1)%numpy.int(num_test/10.)==0):
print( "The test is %.2f %% complete" %(100*(ii+1)/numpy.float(num_test)))
### Pick a random amplitude, centroid and width
amp = numpy.random.rand(1)*(amp_max - amp_min) + amp_min
cen = numpy.random.rand(1)*(cen_max - cen_min) + cen_min
width = numpy.random.rand(1)*(wid_max - wid_min) + wid_min
### Produce Gaussian and add noise
y = multi_gauss(v,[amp,cen,width])
mask = numpy.zeros_like(y)
mask[y>0.1*numpy.amax(y)] = 1
no = numpy.random.normal(loc=0.0,scale=noise_level,size=spec_nv)
y = y+no
co_eff,errors,AIC =fit_single_line(v,y,mask,param)
### Check that a fit was found
if(co_eff[0]==-1):
n = n + 1
print( "No line was detected")
print( "The amplitude, centroid and width was: ", amp, cen, width )
### If there was a fit, check for the number of components found
else:
### If only a single Gaussian found, store the errors on the parameters
if(len(co_eff)==3):
a_e.append(numpy.fabs(amp-co_eff[::3])/amp)
c_e.append(numpy.fabs((cen-co_eff[1::3])/cen))
w_e.append(numpy.fabs(width-co_eff[2::3])/width)
### If the plot tag is turned on, plot the input parameter against the fitted parameters
if(plot_tag==1):
matplotlib.pyplot.figure(1)
matplotlib.pyplot.plot(amp,co_eff[0],"kx")
matplotlib.pyplot.figure(2)
matplotlib.pyplot.plot(cen,co_eff[1],"kx")
matplotlib.pyplot.figure(3)
matplotlib.pyplot.plot(width,co_eff[2],"kx")
### More than one component was detected.
else:
n2 = n2 + 1
print( "More than one component was detected")
print( "The input components were: ", amp, cen, width)
print( "The output amplitudes, centroids and widths were: ", co_eff[0::3], co_eff[1::3], co_eff[2::3])
print( "Test spectrum ID: ", ii)
### print out the median errors and the interquartile range
a_e = 100*numpy.array(a_e)
c_e = 100*numpy.array(c_e)
w_e = 100*numpy.array(w_e)
print( " ")
print( " ")
print( " ")
print( "##############################################")
print( "#### Results for the single Gaussian test ####")
print( "##############################################")
print( "Median error on amplitude as a percentage = %.2f + %.2f - %.2f" %(numpy.percentile(a_e, 50), (numpy.percentile(a_e,75)-numpy.percentile(a_e,50)), (numpy.percentile(a_e,50)-numpy.percentile(a_e,25))))
print( "Median error on centroids as a percentage = %.2f + %.2f - %.2f" %(numpy.percentile(c_e, 50), (numpy.percentile(c_e,75)-numpy.percentile(c_e,50)), (numpy.percentile(c_e,50)-numpy.percentile(c_e,25))))
print( "Median error on widths as a percentage = %.2f + %.2f - %.2f" %(numpy.percentile(w_e, 50), (numpy.percentile(w_e,75)-numpy.percentile(w_e,50)) , (numpy.percentile(w_e,50)-numpy.percentile(w_e,25))))
print( "The number of spectra identified to have more than 1 component was: %i (%.2f%%)" %(n2, n2/num_test * 100))
if(plot_tag==1):
matplotlib.pyplot.figure(1)
matplotlib.pyplot.xlabel("Input amplitude")
matplotlib.pyplot.ylabel("Output amplitude")
matplotlib.pyplot.figure(2)
matplotlib.pyplot.xlabel("Input centroid")
matplotlib.pyplot.ylabel("Output centroid")
matplotlib.pyplot.figure(3)
matplotlib.pyplot.xlabel("Input width")
matplotlib.pyplot.ylabel("Output width")
matplotlib.pyplot.show()
return
### Here we now start the multiple component test. This is to test the code's ability to determine the number of components in test spectra.
def multi_gaussian_test(param):
### Unpack the parameter array
num_test = param["test_number"]
spec_min = param["test_spec_min"]
spec_max = param["test_spec_max"]
spec_nv = param["test_spec_num"]
noise_level = param["test_noise"]
amp_min = param["test_amplitude_min"]
amp_max = param["test_amplitude_max"]
cen_min = param["test_vel_cen_min"]
cen_max = param["test_vel_cen_max"]
wid_min = param["test_width_min"]
wid_max = param["test_width_max"]
plot_tag = param["test_plot_tag"]
var_noise = param["variable_noise"]
if(var_noise!=0):
print("variable_noise should be set to 0 for this test. Setting it to 0 now.")
param["variable_noise"] = 0
### Construct the velocity array
v = numpy.linspace(spec_min,spec_max,spec_nv)
### Initiize the seed for the random number generators
numpy.random.seed(4)
### arrays to store the input number of peaks and the output number of peaks
ni = numpy.zeros(num_test)
nf = numpy.zeros(num_test)
count = 0
print( "#######################")
print( "#### Test progress ####")
print( "#######################")
print( " ")
for ii in range(0,num_test):
if((ii+1)%numpy.int(num_test/10.)==0):
print( "The test is %.2f %% complete" %(100*(ii+1)/numpy.float(num_test)))
### determine the number of peaks, between 1 and 4
npeak = numpy.random.rand(1)*4 + 1
npeak = int(npeak)
ni[ii] = npeak
### arrays for amplitudes, centroids and widths of the components
amp = numpy.zeros(npeak)
cen = numpy.zeros(npeak)
width = numpy.zeros(npeak)
nc = 0
iteration = 0
### This while loop is to ensure that there are no unresolved components in the test spectra.
while(nc<npeak):
if(iteration>50):
ni[ii] = 0
break
iteration = iteration + 1
pass_num = 0
amp[nc] = numpy.random.rand(1)*(amp_max - amp_min) + amp_min
cen[nc] = numpy.random.rand(1)*(cen_max - cen_min) + cen_min
width[nc] = numpy.random.rand(1)*(wid_max - wid_min) + wid_min
if(nc>0):
### check between the new component and previous saved components to see if they are sufficiently far away to be resolved.
for jj in range(0,nc):
if(numpy.fabs(cen[nc] - cen[jj]) > 0.55*2.35*(width[nc] + width[jj])):
pass_num = pass_num + 1
### if the new component is resolved with respect to all the previous components then it is passed and may be saved
if(pass_num == nc):
nc = nc + 1
else:
nc = nc + 1
if(iteration>50):
nf[ii] = 0
continue
test_coeff = numpy.zeros(3*npeak)
test_coeff[::3] = amp
test_coeff[1::3] = cen
test_coeff[2::3] = width
y = multi_gauss(v,test_coeff)
mask = numpy.zeros_like(y)
no = numpy.random.normal(loc=0.0,scale=noise_level,size=spec_nv)
y = y+no
mask[y>3*noise_level] = 1
### Fit this synthetic spectrum
co_eff,errors,AIC =fit_single_line(v,y,mask,param)
### Check that a fit was found
if(co_eff[0]==-1):
nf[ii] = 0
print( "No components detected")
print( "The amplitude, centroid and width were: ", amp, cen, width)
### If a fit was found then determine the number of components and find it.
else:
nf[ii] = len(co_eff[::3])
### If the number of fitted components is not equal to the input number store this information.
if(ni[ii]!=nf[ii]):
count = count+1
print("")
print("The number of fitted components does not equal that of the input spectrum")
print("Co-efficients for the input spectrum:")
print(test_coeff)
print("Co-efficients for the model spectrum:")
print(co_eff)
print("")
print( " ")
print( " ")
print( " ")
print( "################################################")
print( "#### Results for the multiple Gaussian test ####")
print( "################################################")
print( "%d test spectra were fitted with the wrong number of components out of %d tests" %(count,len(ni[ni>0])))
print( "Thus the code has a %.2f %% success rate for these parameters" %(100*(1-count/numpy.float(len(ni[ni>0])))))
return
##### Function for reading parameter file
def read_parameters(param_file):
### The dictionaries for the type of variable and the variable itself
type_of_var = {"debug" : "int",
"smoothing_length" : "float",
"variable_noise" : "int",
"noise_level" : "float",
"signal_to_noise_ratio" : "float",
"max_peaks" : "int",
"max_iterations" : "int",
"min_velocity_channels" : "int",
"min_width_value" : "float",
"max_width_value" : "float",
"mask_pad" : "int",
"delta_AIC_limit" : "float",
"output_base" : "str",
"velocity_channel_number" : "int",
"upper_sigma_level" : "float",
"lower_sigma_level" : "float",
"mask_filter_size" : "int",
"use_velocity_range" : "int",
"min_velocity_range" : "float",
"max_velocity_range" : "float",
"test_number" : "int",
"test_spec_min" : "float",
"test_spec_max" : "float",
"test_spec_num" : "int",
"test_noise" : "float",
"test_amplitude_min" : "float",
"test_amplitude_max" : "float",
"test_width_min" : "float",
"test_width_max" : "float",
"test_vel_cen_min" : "float",
"test_vel_cen_max" : "float",
"test_plot_tag" : "int",
"data_in_file_name" : "str"}
param = {"debug" : 0,
"smoothing_length" : 3.0,
"variable_noise" : 0,
"noise_level" : 0.1,
"signal_to_noise_ratio" : 5,
"max_peaks" : 3,
"max_iterations" : 5,
"min_velocity_channels" : 3,
"min_width_value" : 0.1,
"max_width_value" : 20.0,
"mask_pad" : 2,