-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyanitools.py
127 lines (102 loc) · 3.89 KB
/
pyanitools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue May 29 12:54:05 2018
@author: rebeccasheng
"""
# Written by Roman Zubatyuk and Justin S. Smith
import h5py
import numpy as np
import platform
import os
from sys import exit
PY_VERSION = int(platform.python_version().split('.')[0]) > 3
class datapacker(object):
def __init__(self, store_file, mode='w-', complib='gzip', complevel=6):
"""Wrapper to store arrays within HFD5 file
"""
# opening file
self.store = h5py.File(store_file, mode=mode)
self.clib = complib
self.clev = complevel
def store_data(self, store_loc, **kwargs):
"""Put arrays to store
"""
#print(store_loc)
g = self.store.create_group(store_loc)
for k, v, in kwargs.items():
#print(type(v[0]))
#print(k)
if type(v) == list:
if len(v) != 0:
if type(v[0]) is np.str_ or type(v[0]) is str:
v = [a.encode('utf8') for a in v]
g.create_dataset(k, data=v, compression=self.clib, compression_opts=self.clev)
def cleanup(self):
"""Wrapper to close HDF5 file
"""
self.store.close()
class anidataloader(object):
''' Contructor '''
def __init__(self, store_file):
if not os.path.exists(store_file):
#print(os.path)
exit('Error: file not found - '+store_file)
self.store = h5py.File(store_file)
''' Group recursive iterator (iterate through all groups in all branches and return datasets in dicts) '''
def h5py_dataset_iterator(self,g, prefix=''):
for key in g.keys():
item = g[key]
path = '{}/{}'.format(prefix, key)
keys = [i for i in item.keys()]
if isinstance(item[keys[0]], h5py.Dataset): # test for dataset
data = {'path':path}
for k in keys:
if not isinstance(item[k], h5py.Group):
dataset = np.array(item[k].value)
if type(dataset) is np.ndarray:
if dataset.size != 0:
if type(dataset[0]) is np.bytes_:
dataset = [a.decode('ascii') for a in dataset]
data.update({k:dataset})
yield data
else: # test for group (go down)
yield from self.h5py_dataset_iterator(item, path)
''' Default class iterator (iterate through all data) '''
def __iter__(self):
for data in self.h5py_dataset_iterator(self.store):
yield data
''' Returns a list of all groups in the file '''
def get_group_list(self):
return [g for g in self.store.values()]
''' Allows interation through the data in a given group '''
def iter_group(self,g):
for data in self.h5py_dataset_iterator(g):
yield data
''' Returns the requested dataset '''
def get_data(self, path, prefix=''):
item = self.store[path]
path = '{}/{}'.format(prefix, path)
keys = [i for i in item.keys()]
data = {'path': path}
# print(path)
for k in keys:
if not isinstance(item[k], h5py.Group):
dataset = np.array(item[k].value)
if type(dataset) is np.ndarray:
if dataset.size != 0:
if type(dataset[0]) is np.bytes_:
dataset = [a.decode('ascii') for a in dataset]
data.update({k: dataset})
return data
''' Returns the number of groups '''
def group_size(self):
return len(self.get_group_list())
def size(self):
count = 0
for g in self.store.values():
count = count + len(g.items())
return count
''' Close the HDF5 file '''
def cleanup(self):
self.store.close()