/
Core.py
executable file
·1476 lines (1299 loc) · 63.5 KB
/
Core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Stoner.Core provides the core classes for the Stoner package."""
__all__ = [
"StonerLoadError",
"StonerSetasError",
"_setas",
"regexpDict",
"typeHintedDict",
"metadataObject",
"DataArray",
"DataFile",
]
import re
import io
import copy
import pathlib
from collections.abc import MutableSequence, Mapping, Iterable
import inspect as _inspect_
from importlib import import_module
from textwrap import TextWrapper
import numpy as np
from numpy import NaN # NOQA pylint: disable=unused-import
from numpy import ma
from .compat import string_types, int_types, index_types, _pattern_type, path_types
from .core import _setas, regexpDict, typeHintedDict, metadataObject
from .core.array import DataArray
from .core.exceptions import StonerLoadError, StonerSetasError
from .core.operators import DataFileOperatorsMixin
from .core.property import DataFilePropertyMixin
from .core.interfaces import DataFileInterfacesMixin
from .core.methods import DataFileSearchMixin
from .tools import all_type, isiterable, isLikeList, get_option, isclass, copy_into
from .tools.file import file_dialog
try:
from tabulate import tabulate
tabulate.PRESERVE_WHITESPACE = True
except ImportError:
tabulate = None
try:
import pandas as pd
except ImportError:
pd = None
class DataFile(
DataFileSearchMixin,
DataFileInterfacesMixin,
DataFileOperatorsMixin,
DataFilePropertyMixin,
metadataObject,
MutableSequence,
):
"""Base class object that represents a matrix of data, associated metadata and column headers.
Attributes:
column_headers (list):
list of strings of the column names of the data.
data (2D numpy masked array):
The attribute that stores the nuermical data for each DataFile. This is a :py:class:`DataArray` instance -
which is itself a subclass of :py:class:`numpy.ma.MaskedArray`.
title (string):
The title of the measurement.
filename (string):
The current filename of the data if loaded from or already saved to disc. This is the default filename
used by the :py:meth:`Stoner.Core.DataFile.load` and :py:meth:`Stoner.Core.DataFile.save`.
header (string):
A readonly property that returns a pretty formatted string giving the header of tabular representation.
mask (array of booleans):
Returns the current mask applied to the numerical data equivalent to self.data.mask.
mime_type (list of str):
The possible mime-types of data files represented by each matching filename pattern in
:py:attr:`Datafile.pattern`.
patterns (list):
A list of filename extension glob patterns that matrches the expected filename patterns for a DataFile
(*.txt and *.dat")
priority (int):
Used to indicathe order in which subclasses of :py:class:`DataFile` are tried when loading data. A higher
number means a lower priority (!)
setas (:py:class:`_stas`):
Defines certain columns to contain X, Y, Z or errors in X,Y,Z data.
shape (tuple of integers):
Returns the shape of the data (rows,columns) - equivalent to self.data.shape.
records (numpy record array):
Returns the data in the form of a list of yuples where each tuple maps to the columns names.
clone (DataFile):
Creates a deep copy of the :py:class`DataFile` object.
dict_records (array of dictionaries):
View the data as an array or dictionaries where each dictionary represents one row with keys derived
from column headers.
dims (int):
When data columns are set as x,y,z etc. returns the number of dimensions implied in the data set
dtype (numpoy dtype):
Returns the datatype stored in the :py:attr:`DataFile.data` attribute.
T (:py:class:`DataArray`):
Transposed version of the data.
subclasses (list):
Returns a list of all the subclasses of DataFile currently in memory, sorted by
their py:attr:`Stoner.Core.DataFile.priority`. Each entry in the list consists of the
string name of the subclass and the class object.
xcol (int):
If a column has been designated as containing *x* values, this will return the index of that column
xerr (int):
Similarly to :py:attr:`DataFile.xcol` but for the x-error value column.
ycol (list of int):
Similarly to :py:attr:`DataFile.xcol` but for the y value columns.
yerr (list of int):
Similarly to :py:attr:`DataFile.xcol` but for the y error value columns.
zcol (list of int):
Similarly to :py:attr:`DataFile.xcol` but for the z value columns.
zerr (list of int):
Similarly to :py:attr:`DataFile.xcol` but for the z error value columns.
ucol (list of int):
Similarly to :py:attr:`DataFile.xcol` but for the u (x-axis direction cosine) columns.
vcol (list of int):
Similarly to :py:attr:`DataFile.xcol` but for the v (y-axis direction cosine) columns.
wcol (list of int):
Similarly to :py:attr:`DataFile.xcol` but for the w (z-axis direction cosine) columns.
"""
#: priority (int): is the load order for the class, smaller numbers are tried before larger numbers.
# .. note::
#
# Subclasses with priority<=32 should make some positive identification that they have the right
# file type before attempting to read data.
priority = 32
#: pattern (list of str): A list of file extensions that might contain this type of file. Used to construct
# the file load/save dialog boxes.
_patterns = ["*.txt", "*.tdi"] # Recognised filename patterns
# mimetypes we match
mime_type = ["text/plain"]
_conv_string = np.vectorize(str)
_conv_float = np.vectorize(float)
# ====================================================================================
############################ Object Construction ###############################
# ====================================================================================
def __new__(cls, *args, **kargs):
"""Prepare the basic DataFile instance before the mixins add their bits."""
self = metadataObject.__new__(cls)
object.__setattr__(self, "debug", kargs.pop("debug", False))
self._masks = [False]
self._filename = None
object.__setattr__(self, "_data", DataArray([]))
self._baseclass = DataFile
self._kargs = kargs
return self
def __init__(self, *args, **kargs):
"""Initialise the DataFile from arrays, dictionaries and filenames.
Various forms are recognised:
.. py:function:: DataFile('filename',<optional filetype>,<args>)
:noindex:
Creates the new DataFile object and then executes the :py:class:`DataFile`.load
method to load data from the given *filename*.
.. py:function:: DataFile(array)
:noindex:
Creates a new DataFile object and assigns the *array* to the
:py:attr:`DataFile.data` attribute.
.. py:function:: DataFile(dictionary)
:noindex:
Creates the new DataFile object. If the dictionary keys are all strigns and the values are all
numpy D arrays of equal length, then assumes the dictionary represents columns of data and the keys
are the column titles, otherwise initialises the metadata with :parameter: dictionary.
.. py:function:: DataFile(array,dictionary)
:noindex:
Creates the new DataFile object and does the combination of the
previous two forms.
.. py:function:: DataFile(DataFile)
:noindex:
Creates the new DataFile object and initialises all data from the
existing :py:class:`DataFile` instance. This on the face of it does the same as
the assignment operator, but is more useful when one or other of the
DataFile objects is an instance of a sub - class of DataFile
Args:
args (positional arguments):
Variable number of arguments that match one of the definitions above
kargs (keyword Arguments):
All keyword arguments that match public attributes are used to set those public attributes.
"""
# init instance attributes
super().__init__(**kargs) # initialise self.metadata)
self._public_attrs = {
"data": np.ndarray,
"filetype": str,
"setas": (string_types, list, dict),
"column_headers": list,
"metadata": typeHintedDict,
"debug": bool,
"filename": string_types,
"mask": (np.ndarray, bool),
}
self._repr_limits = (256, 6)
handler = [lambda *args, **kargs: None, self._init_single, self._init_double, self._init_many][
min(len(args), 3)
]
self.mask = False
self.data._setas._get_cols()
handler(*args, **kargs)
try:
kargs = self._kargs
delattr(self, "_kargs")
except AttributeError:
pass
self.metadata["Stoner.class"] = type(self).__name__
if kargs: # set public attributes from keywords
to_go = []
for k, val in kargs.items():
if k in self._public_attrs:
if isinstance(val, self._public_attrs[k]):
self.__setattr__(k, val)
else:
self._raise_type_error(k)
to_go.append(k)
else:
raise AttributeError(f"{k} is not an allowed attribute of {self._public_attrs}")
for k in to_go:
del kargs[k]
if self.debug:
print("Done DataFile init")
# ============================================================================================
############################ Constructor Methods ###########################################
# ============================================================================================
def _init_single(self, *args, **kargs):
"""Handle constructor with 1 argument - called from __init__."""
arg = args[0]
inits = {
path_types + (bool, bytes, io.IOBase): self._init_load,
np.ndarray: self._init_array,
DataFile: self._init_datafile,
pd.DataFrame: self._init_pandas,
"Stoner.Image.core.ImageFile": self._init_imagefile,
Mapping: self._init_dict,
Iterable: self._init_list,
}
for typ, meth in inits.items():
if isinstance(typ, str):
parts = typ.split(".")
mod = import_module(".".join(parts[:-1]))
typ = getattr(mod, parts[-1])
if isinstance(arg, typ):
meth(arg, **kargs)
break
else:
raise TypeError(f"No constructor for {type(arg)}")
self.data._setas.cols.update(self.setas._get_cols())
def _init_double(self, *args, **kargs):
"""Two argument constructors handled here. Called form __init__."""
(arg0, arg1) = args
if isinstance(arg1, dict) or (isiterable(arg1) and all_type(arg1, string_types)):
self._init_single(arg0, **kargs)
self._init_single(arg1, **kargs)
elif (
isinstance(arg0, np.ndarray)
and isinstance(arg1, np.ndarray)
and len(arg0.shape) == 1
and len(arg1.shape) == 1
):
self._init_many(*args, **kargs)
def _init_many(self, *args, **kargs):
"""Handle more than two arguments to the constructor - called from init."""
for a in args:
if not (isinstance(a, np.ndarray) and a.ndim == 1):
copy_into(self.__class__.load(a, **kargs), self)
break
else:
self.data = np.column_stack(args)
def _init_array(self, arg, **kargs): # pylint: disable=unused-argument
"""Initialise from a single numpy array."""
# numpy.array - set data
if np.issubdtype(arg.dtype, np.number):
self.data = DataArray(np.atleast_2d(arg), setas=self.data._setas)
self.column_headers = [f"Column_{x}" for x in range(np.shape(arg)[1])]
elif isinstance(arg[0], dict):
for row in arg:
self += row
def _init_datafile(self, arg, **kargs): # pylint: disable=unused-argument
"""Initialise from datafile."""
for a in arg.__dict__:
if not callable(a) and a != "_baseclass":
super().__setattr__(a, copy.copy(getattr(arg, a)))
self.metadata = arg.metadata.copy()
self.data = DataArray(arg.data, setas=arg.setas.clone)
self.data.setas = arg.setas.clone
def _init_dict(self, arg, **kargs): # pylint: disable=unused-argument
"""Initialise from dictionary."""
if (
all_type(arg.keys(), string_types)
and all_type(arg.values(), np.ndarray)
and np.all([len(arg[k].shape) == 1 and np.all(len(arg[k]) == len(list(arg.values())[0])) for k in arg])
):
self.data = np.column_stack(tuple(arg.values()))
self.column_headers = list(arg.keys())
else:
self.metadata = arg.copy()
def _init_imagefile(self, arg, **kargs): # pylint: disable=unused-argument
"""Initialise from an ImageFile."""
x = arg.get("x_vector", np.arange(arg.shape[1]))
y = arg.get("y_vector", np.arange(arg.shape[0]))
x, y = np.meshgrid(x, y)
z = arg.image
self.data = np.column_stack((x.ravel(), y.ravel(), z.ravel()))
self.metadata = copy.deepcopy(arg.metadata)
self.column_headers = ["X", "Y", "Image Intensity"]
self.setas = "xyz"
def _init_pandas(self, arg, **kargs): # pylint: disable=unused-argument
"""Initialise from a pandas dataframe."""
self.data = arg.values
ch = []
for ix, col in enumerate(arg):
if isinstance(col, string_types):
ch.append(col)
elif isiterable(col):
for ch_i in col:
if isinstance(ch_i, string_types):
ch.append(ch_i)
break
else:
ch.append(f"Column {ix}")
else:
ch.append(f"Column {ix}:{col}")
self.column_headers = ch
self.metadata.update(arg.metadata)
if isinstance(arg.columns, pd.MultiIndex) and len(arg.columns.levels) > 1:
for label in arg.columns.get_level_values(1):
if label not in list("xyzdefuvw."):
break
else:
self.setas = list(arg.columns.get_level_values(1))
def _init_load(self, arg, **kargs):
"""Load data from a file-like source.
arg(str, PurePath, IOBase, bool):
If arg is a str, PaurePath, ioBase then open the file like object and read. If arg is bool and False,
provide a dialog box instead.
"""
if isinstance(arg, bool):
if arg:
raise ValueError("Cannot construct a DataFile with a single argument of True")
elif isinstance(arg, pathlib.PurePath):
arg = str(arg)
copy_into(self.__class__.load(filename=arg, **kargs), self)
def _init_list(self, arg, **kargs):
"""Initialise from a list or other ioterable."""
if all_type(arg, string_types):
self.column_headers = list(arg)
elif all_type(arg, np.ndarray):
self._init_many(*arg, **kargs)
else:
raise TypeError(f"Unable to construct DataFile from a {type(arg)}")
# ============================================================================================
############################ Special Methods ###############################################
# ============================================================================================
def __call__(self, *args, **kargs):
"""Clone the DataFile, but allowing additional arguments to modify the new clone.
Args:
*args (tuple):
Positional arguments to pass through to the new clone.
**kargs (dict):
Keyword arguments to pass through to the new clone.
Raises:
TypeError: If a keyword argument doesn't match an attribute.
Returns:
new_d (DataFile):
Modified clone of the current object.
"""
new_d = self.clone
handler = [lambda *args, **kargs: None, new_d._init_single, new_d._init_double, new_d._init_many][
min(len(args), 2)
]
handler(*args, **kargs)
if kargs: # set public attributes from keywords
myattrs = new_d._public_attrs
for k in set(kargs.keys()) & set(myattrs.keys()):
if isinstance(kargs[k], myattrs[k]):
new_d.__setattr__(k, kargs[k])
else:
if isinstance(myattrs[k], tuple):
typ = "one of " + ",".join([str(type(t)) for t in myattrs[k]])
else:
typ = f"a {type(myattrs[k])}"
raise TypeError(f"{k} should be {typ} not a {type(kargs[k])}")
return new_d
def __deepcopy__(self, memo):
"""Provide support for copy.deepcopy to work."""
cls = type(self)
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
try:
setattr(result, k, copy.deepcopy(v, memo))
except (TypeError, ValueError, RecursionError):
setattr(result, k, copy.copy(v))
return result
def __dir__(self):
"""Return the attributes of the current object.
Augmenting the keys of self.__dict__ with the attributes that __getattr__ will handle.
"""
attr = dir(type(self))
col_check = {"xcol": "x", "xerr": "d", "ycol": "y", "yerr": "e", "zcol": "z", "zerr": "f"}
if not self.setas.empty:
for k, val in col_check.items():
if k.startswith("x"):
if k in self._data._setas.cols and self._data._setas.cols[k] is not None:
attr.append(val)
else:
if k in self._data._setas.cols and self._data._setas.cols[k]:
attr.append(val)
return sorted(set(attr))
def __getattr__(self, name):
"""Handle some special pseudo attributes that map to the setas columns.
Args:
name (string):
The name of the attribute to be returned.
Returns:
Various:
the DataFile object in various forms
Supported attributes:
- records:
return the DataFile data as a numpy structured
array - i.e. rows of elements whose keys are column headings
- clone:
returns a deep copy of the current DataFile instance
Otherwise the name parameter is tried as an argument to :py:meth:`DataFile.column` and the resultant column
is returned. If DataFile.column raises a KeyError this is remapped as an AttributeError.
"""
setas_cols = ("x", "y", "z", "d", "e", "f", "u", "v", "w", "r", "q", "p")
if name != "debug" and self.debug:
print(name)
try:
return super().__getattr__(name)
except AttributeError:
ret = self.__dict__.get(name, type(self).__dict__.get(name, None))
if ret is not None:
return ret
if name in setas_cols:
ret = self._getattr_col(name)
if ret is not None:
return ret
if name in self.setas.cols:
ret = self.setas.cols[name]
if ret is not None and ret != []:
return ret
try:
col = self._data._setas.find_col(name)
return self.column(col)
except (KeyError, IndexError):
pass
if name in setas_cols: # Probably tried to use a setas col when it wasn't defined
raise StonerSetasError(
f"Tried accessing a {name} column, but setas is not defined and {name} is not a column name either"
)
raise AttributeError(f"{name} is not an attribute of DataFile nor a column name")
# def __reduce_ex__(self, p):
# """Machinery used for deepcopy."""
# cls=type(self)
# return (cls, (), self.__getstate__())
def __repr__(self):
"""Output the :py:class:`DataFile` object in TDI format.
This allows one to print any :py:class:`DataFile` to a stream based
object andgenerate a reasonable textual representation of the data.shape
Returns:
self in a textual format.
"""
if get_option("short_repr") or get_option("short_data_repr"):
return self._repr_short_()
try:
return self._repr_table_()
except (ImportError, ValueError, TypeError):
return self.__repr_core__(256)
def __setattr__(self, name, value):
"""Handle attempts to set attributes not covered with class attribute variables.
Args:
name (str):
Name of attribute to set. Details of possible attributes below:
- mask Passes through to the mask attribute of self.data (which is a numpy masked array).
Also handles the case where you pass a callable object to nask where we pass each row to the
function and use the return reult as the mask
- data Ensures that the :py:attr:`data` attribute is always a :py:class:`numpy.ma.maskedarray`
"""
if hasattr(type(self), name) and isinstance(getattr(type(self), name), property):
super().__setattr__(name, value)
elif len(name) == 1 and name in "xyzuvwdef" and self.setas[name]:
self._setattr_col(name, value)
else:
super().__setattr__(name, value)
def __str__(self):
"""Provide an implementation for str(DataFile) that does not shorten the output."""
return self.__repr_core__(False)
# ============================================================================================
############################ Private Methods #################################################
# ============================================================================================
def _col_args(self, *args, **kargs):
"""Create an object which has keys based either on arguments or setas attribute."""
return self.data._col_args(*args, **kargs) # Now just pass through to DataArray
def _getattr_col(self, name):
"""Get a column using the setas attribute."""
try:
return getattr(self._data, name)
except StonerSetasError:
return None
def _interesting_cols(self, cols):
"""Workout which columns the user might be interested in in the basis of the setas.
ArgsL
cols (float):
Maximum Number of columns to display
Returns
list(ints):
The indices of interesting columns with breaks in runs indicated by -1
"""
c = self.shape[1]
if c > cols:
interesting = []
last = -1
for ix, typ in enumerate(self.setas):
if last not in (-1, ix - 1):
interesting.append(-1)
last = -1
if typ != ".":
interesting.append(ix)
last = ix
if interesting and interesting[-1] == -1:
interesting = interesting[:-1]
if interesting:
c_start = max(interesting) + 1
else:
c_start = 0
interesting.extend(range(c_start, c))
if len(interesting) < cols:
cols = len(interesting)
if interesting[cols - 3] != -1:
interesting[cols - 2] = -1
else:
interesting[cols - 2] = c - 2
interesting[cols - 1] = c - 1
interesting = interesting[:cols]
c = cols
else:
interesting = list(range(c))
col_assignments = []
for i in interesting:
if i != -1:
if self.setas[i] != ".":
col_assignments.append(f"{i} ({self.setas[i]})")
else:
col_assignments.append(f"{i}")
else:
col_assignments.append("")
return interesting, col_assignments, cols
def __repr_core__(self, shorten=1000):
"""Actuall do the repr work, but allow for a shorten parameter to save printing big files out to disc."""
outp = "TDI Format 1.5\t" + "\t".join(self.column_headers) + "\n"
m = len(self.metadata)
self.data = np.atleast_2d(self.data)
r = np.shape(self.data)[0]
md = self.metadata.export_all()
for x in range(min(r, m)):
if self.data.ndim != 2 or self.shape[1] == 1:
outp += f"{md[x]}\t{self.data[x]}\n"
else:
outp = outp + md[x] + "\t" + "\t".join([str(y) for y in self.data[x].filled()]) + "\n"
if m > r: # More metadata
for x in range(r, m):
outp = outp + md[x] + "\n"
elif r > m: # More data than metadata
if shorten is not None and shorten and r - m > shorten:
for x in range(m, m + shorten - 100):
if self.data.ndim != 2 or self.shape[1] == 1:
outp += "\t" + f"\t{self.data[x]}\n"
else:
outp += "\t" + "\t".join([str(y) for y in self.data[x].filled()]) + "\n"
outp += f"... {r - m - shorten + 100} lines skipped...\n"
for x in range(-100, -1):
if self.data.ndim != 2 or self.shape[1] == 1:
outp += f"\t\t{self.data[x]}\n"
else:
outp += "\t" + "\t".join([str(y) for y in self.data[x].filled()]) + "\n"
else:
for x in range(m, r):
if self.data.ndim != 2 or self.shape[1] == 1:
outp += f"\t\t{self.data[x]}\n"
else:
outp = outp + "\t" + "\t".join([str(y) for y in self.data[x].filled()]) + "\n"
return outp
def _repr_html_private(self):
"""Version of repr_core that does and html output."""
return self._repr_table_("html")
def _repr_short_(self):
ret = (
f"{self.filename}({type(self)}) of shape {self.shape} ({''.join(self.setas)})"
+ f" and {len(self.metadata)} items of metadata"
)
return ret
def _repr_table_(self, fmt="rst"):
"""Convert the DataFile to a 2D array and then feed to tabulate."""
if tabulate is None:
raise ImportError("No tabulate.")
lb = "<br/>" if fmt == "html" else "\n"
rows, cols = self._repr_limits
r, c = self.shape
interesting, col_assignments, cols = self._interesting_cols(cols)
c = min(c, cols)
if len(interesting) > 0:
c_w = max([len(self.column_headers[x]) for x in interesting if x > -1])
else:
c_w = 0
wrapper = TextWrapper(subsequent_indent="\t", width=max(20, (80 - c_w * c)))
if r > rows:
shorten = [True, False]
r = rows + rows % 2
else:
shorten = [False, False]
shorten[1] = c > cols
r = max(len(self.metadata), r)
outp = np.zeros((r + 1, c + 1), dtype=object)
outp[:, :] = "..."
ch = [self.column_headers[ix] if ix >= 0 else "...." for ix in interesting]
for ix, (h, i) in enumerate(zip(ch, col_assignments)):
spaces1 = " " * ((c_w - len(h)) // 2)
spaces2 = " " * ((c_w - len(i)) // 2)
ch[ix] = f"{spaces1}{h}{lb}{spaces2}{i}"
if self.debug:
print(len(spaces1), len(spaces2))
outp[0, 1:] = ch
outp[1, 1:] = col_assignments
outp[0, 0] = f"TDI Format 1.5{lb}index"
i = 1
for md in self.metadata.export_all():
md = md.replace("=", "= ")
for line in wrapper.wrap(md):
if i >= outp.shape[0]: # pylint: disable=E1136
outp = np.append(outp, [[""] * outp.shape[1]], axis=0) # pylint: disable=E1136
outp[i, 0] = line
i += 1
for ic, c in enumerate(interesting):
if c >= 0:
if shorten[0]:
col_out = np.where(self.mask[: r // 2 - 1, c], "#####", self.data[: r // 2 - 1, c].astype(str))
outp[1 : r // 2, ic + 1] = col_out
col_out = np.where(self.mask[-r // 2 :, c], "#####", self.data[-r // 2 :, c].astype(str))
outp[r // 2 + 1 : r + 1, ic + 1] = col_out
else:
col_out = np.where(self.mask[:, c], "#####", self.data[:, c].astype(str))
outp[1 : len(self.data) + 1, ic + 1] = col_out
return tabulate(outp[1:], outp[0], tablefmt=fmt, numalign="decimal", stralign="left")
def _setattr_col(self, name, value):
"""Attempt to either assign data columns if set up, or setas setting.
Args:
name (length 1 string):
Column type to work with (one of x,y,z,u,v,w,d,e or f)
value (nd array or column index):
If an ndarray and the column type corresponding to *name* is set up, then overwrite the column(s)
of data with this new data. If an index type, then set the corresponding setas assignment to
these columns.
"""
if isinstance(value, np.ndarray):
value = np.atleast_2d(value)
if value.shape[0] == self.data.shape[0]:
pass
elif value.shape[1] == self.data.shape[0]:
value = value.T
else:
raise RuntimeError("Value to be assigned to data columns is the wrong shape!")
for i, ix in enumerate(self.find_col(self.setas[name], force_list=True)):
self.data[:, ix] = value[:, i]
elif isinstance(value, index_types):
self._set_setas({name: value})
def _set_mask(self, func, invert=False, cumulative=False, col=0):
"""Apply func to each row in self.data and uses the result to set the mask for the row.
Args:
func (callable):
A Callable object of the form lambda x:True where x is a row of data (numpy
invert (bool):
Optionally invert te reult of the func test so that it unmasks data instead
cumulative (bool):
if tru, then an unmask value doesn't unmask the data, it just leaves it as it is.
"""
i = -1
args = len(_inspect_.getargs(func.__code__)[0])
for r in self.rows():
i += 1
r.mask = False
if args == 2:
t = func(r[col], r)
else:
t = func(r)
if isinstance(t, (bool, np.bool_)):
if t ^ invert:
self.data[i] = ma.masked
elif not cumulative:
self.data[i] = self._data.data[i]
else:
for j in range(min(len(t), np.shape(self.data)[1])):
if t[j] ^ invert:
self.data[i, j] = ma.masked
elif not cumulative:
self.data[i, j] = self.data.data[i, j]
def _push_mask(self, mask=None):
"""Copy the current data mask to a temporary store and replace it with a new mask if supplied.
Args:
mask (:py:class:numpy.array of bool or bool or None):
The new data mask to apply (defaults to None = unmask the data
Returns:
Nothing
"""
self._masks.append(self.mask)
if mask is None:
self.data.mask = False
else:
self.mask = mask
def _pop_mask(self):
"""Replace the mask on the data with the last one stored by _push_mask().
Returns:
Nothing
"""
self.mask = False
self.mask = self._masks.pop() # pylint: disable=E0203
if not self._masks: # pylint: disable=E0203
self._masks = [False]
def _raise_type_error(self, k):
"""Raise a type error when setting an attribute k."""
if isinstance(self._public_attrs[k], tuple):
typ = "one of " + ",".join([str(type(t)) for t in self._public_attrs[k]])
else:
typ = f"a {type(self._public_attrs[k])}"
raise TypeError(f"{k} should be {typ}")
# ============================================================================================
############################ Public Methods ################################################
# ============================================================================================
def add_column(self, column_data, header=None, index=None, func_args=None, replace=False, setas=None):
"""Append a column of data or inserts a column to a datafile instance.
Args:
column_data (:py:class:`numpy.array` or list or callable):
Data to append or insert or a callable function that will generate new data
Keyword Arguments:
header (string):
The text to set the column header to,
if not supplied then defaults to 'col#'
index (index type):
The index (numeric or string) to insert (or replace) the data
func_args (dict):
If column_data is a callable object, then this argument
can be used to supply a dictionary of function arguments to the callable object.
replace (bool):
Replace the data or insert the data (default)
setas (str):
Set the type of column (x,y,z data etc - see :py:attr:`Stoner.Core.DataFile.setas`)
Returns:
self:
The :py:class:`DataFile` instance with the additional column inserted.
Note:
Like most :py:class:`DataFile` methods, this method operates in-place in that it also modifies
the original DataFile Instance as well as returning it.
"""
if index is None or isinstance(index, bool) and index: # Enure index is set
index = self.shape[1]
replace = False
elif isinstance(index, int_types) and index == self.shape[1]:
replace = False
else:
index = self.find_col(index)
# Sort out the data and get it into an array of values.
if isinstance(column_data, list):
column_data = np.array(column_data)
if isinstance(column_data, DataArray) and header is None:
header = column_data.column_headers
if isinstance(column_data, np.ndarray):
np_data = column_data
elif callable(column_data):
if isinstance(func_args, dict):
new_data = [column_data(x, **func_args) for x in self]
else:
new_data = [column_data(x) for x in self]
np_data = np.array(new_data)
else:
return NotImplemented
# Sort out the sizes of the arrays
if np_data.ndim == 1:
np_data = np.atleast_2d(np_data).T
cl, cw = np_data.shape
# Make setas
setas = "." * cw if setas is None else setas
if isiterable(setas) and len(setas) == cw:
for s in setas:
if s not in ".-xyzuvwdefpqr":
raise TypeError(
f"setas parameter should be a string or list of letter in the set xyzdefuvw.-, not {setas}"
)
else:
raise TypeError(
f"""setas parameter should be a string or list of letter the same length as the number of columns
being added in the set xyzdefuvw.-, not {setas}"""
)
# Make sure our current data is at least 2D and get its size
if len(self.data.shape) == 1:
self.data = np.atleast_2d(self.data).T
if len(self.data.shape) == 2:
(dr, dc) = self.data.shape
elif not self.data.shape:
self.data = np.array([[]])
(dr, dc) = (0, 0)
# Expand either our current data or new data to have the same number of rows
if cl > dr and dc * dr > 0: # Existing data is finite and too short
self.data = DataArray(np.append(self.data, np.zeros((cl - dr, dc)), 0), setas=self.setas.clone)
elif cl < dr: # New data is too short
np_data = np.append(np_data, np.zeros((dr - cl, cw)))
if np_data.ndim == 1:
np_data = np.atleast_2d(np_data).T
elif dc == 0: # Existing data has no width - replace with cl,0
self.data = DataArray(np.zeros((cl, 0)))
elif dr == 0: # Existing data has no rows - expand existing data with zeros to have right length
self.data = DataArray(np.append(self.data, np.zeros((cl, dr)), axis=0), setas=self.setas.clone)
# If not replacing, then add extra columns to existing data.
if not replace:
columns = copy.copy(self.column_headers)
old_setas = self.setas.clone
if index == self.data.shape[1]: # appending column
self.data = DataArray(np.append(self.data, np_data, axis=1), setas=self.setas.clone)
else:
self.data = DataArray(
np.append(
self.data[:, :index], np.append(np.zeros_like(np_data), self.data[:, index:], axis=1), axis=1
),
setas=self.setas.clone,
)
for ix in range(0, index):
self.column_headers[ix] = columns[ix]
self.setas[ix] = old_setas[ix]
for ix in range(index, dc):
self.column_headers[ix + cw] = columns[ix]
self.setas[ix + cw] = old_setas[ix]
# Check that we don't need to expand to overwrite with the new data
if index + cw > self.shape[1]:
self.data = DataArray(
np.append(self.data, np.zeros((self.data.shape[0], self.data.shape[1] - index + cw)), axis=1),
setas=self.setas.clone,
)
# Put the data into the array
self.data[:, index : index + cw] = np_data
if header is None: # This will fix the header if not defined.
header = [f"Column {ix}" for ix in range(index, index + cw)]
if isinstance(header, string_types):
header = [header]
if len(header) != cw:
header.extend(["Column {ix}" for x in range(index, index + cw)])
for ix, (hdr, s) in enumerate(zip(header, setas)):
self.column_headers[ix + index] = hdr
self.setas[index + ix] = s
return self
def columns(self, not_masked=False, reset=False):
"""Iterate over the columns of data int he datafile.
Keyword Args:
no_masked (bool):
Only iterate over columns that don't have masked elements
reset (bool):
If true then reset the iterator (immediately stops the current iteration without returning any data)./
Yields:
1D array: Returns the next column of data.
"""
for ix, col in enumerate(self.data.T):
if not_masked and ma.is_masked(col):
continue
if reset:
return
yield self.column(ix)
def del_column(self, col=None, duplicates=False):
"""Delete a column from the current :py:class:`DataFile` object.
Args:
col (int, string, iterable of booleans, list or re):
is the column index as defined for :py:meth:`DataFile.find_col` to the column to be deleted
Keyword Arguments:
duplicates (bool):
(default False) look for duplicated columns
Returns:
self:
The :py:class:`DataFile` object with the column deleted.
Note:
- If duplicates is True and col is None then all duplicate columns are removed,
- if col is not None and duplicates is True then all duplicates of the specified column are removed.
- If duplicates is False and *col* is either None or False then all masked coplumns are deleeted. If
*col* is True, then all columns that are not set i the :py:attr:`setas` attrobute are deleted.
- If col is a list (duplicates should not be None) then the all the matching columns are found.
- If col is an iterable of booleans, then all columns whose elements are False are deleted.
- If col is None and duplicates is None, then all columns with at least one elelemtn masked
will be deleted
"""
if duplicates:
ch = self.column_headers
dups = []
if col is None:
for i, chi in enumerate(ch):
if chi in ch[i + 1 :]:
dups.append(ch.index(chi, i + 1))