/
main.py
126 lines (111 loc) · 4.61 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import paramiko
import datetime
import os
import logging
import sage_data_client
import argparse
import matplotlib.pyplot as plt
import xarray as xr
from matplotlib.dates import DateFormatter
from waggle.plugin import Plugin
from pathlib import Path
mrr_ip_address = '10.31.81.113'
mrr_user_name = 'mrruser'
mrr_password = 'metek'
vsn = "W08D"
def recursive_list(sftp):
"""
Returns a recursive directory listing from the current directory in the SFTP Connection.
This assumes the data are stored as /%y%m/%y%m%d/
Parameters
----------
sftp: SFTPClient
The current SFTP Client.
Returns
-------
list: str list
The list of .nc files available
"""
file_list = []
year_months = sftp.listdir()
for ym in year_months:
ymds = sftp.listdir('/u/data/%s/' % ym)
for ymd in ymds:
files = sftp.listdir('/u/data/%s/%s/' % (ym, ymd))
for fi in files:
if '.nc' in fi:
file_list.append('/u/data/%s/%s/%s' % (ym, ymd, fi))
return sorted(file_list)
def main(args):
num_files = int(args.num_files)
with Plugin() as plugin:
t = paramiko.Transport((mrr_ip_address, 22))
channel = t.connect(username=mrr_user_name, password=mrr_password)
sftp = t.open_sftp_client()
sftp.chdir('/u/data')
file_list = recursive_list(sftp)
# Make sure we have a complete file from the MRR and continue when we do when in
# pull one file mode
if num_files == 1:
while file_list[-1][0] == ".":
file_list = recursive_list(sftp)
# If we're pulling every file, then load database to make sure we are not uploading duplicates
file_names = []
if num_files > 1:
print("Accessing beehive...")
df = sage_data_client.query(
start="-%dh" % num_files,
filter={"name": "upload", "vsn": "W08D",
"task": "mrrpro"}).set_index("timestamp")
file_names = df['meta.filename'].values
for fi in file_list[-num_files:]:
print(fi)
base, name = os.path.split(fi)
try:
dt = datetime.datetime.strptime(name, '%Y%m%d_%H%M%S.nc')
except ValueError:
# File incomplete, skip. Should not happen in one-file mode.
continue
if not int(args.hour) == -1:
if not int(dt.hour) == int(args.hour):
continue
if name in file_names:
print('%s already on beehive, skipping!' % name)
continue
timestamp = int(datetime.datetime.timestamp(dt) * 1e9)
logging.debug("Downloading %s" % fi)
sftp.get(fi, localpath=name)
ds = xr.open_dataset(name)
# Adam's quicklook plot code
fig, ax = plt.subplots(4, 1, figsize=(10, 10))
ds["Zea"].T.plot(cmap='Spectral_r', vmin=10, vmax=50, ax=ax[0])
ax[0].set_xlabel('Time [UTC]', fontsize=12), ax[0].set_ylabel('Height', fontsize=12)
ax[0].set_ylim(0, 5000)
ds["VEL"].T.plot(cmap='coolwarm', vmin=-6, vmax=6, ax=ax[1])
ax[1].set_xlabel('Time [UTC]', fontsize=12), ax[1].set_ylabel('Height', fontsize=12)
ax[1].set_ylim(0, 5000)
ds["WIDTH"].T.plot(cmap='Spectral_r', vmin=0, vmax=5, ax=ax[2])
ax[2].set_xlabel('Time [UTC]', fontsize=12), ax[2].set_ylabel('Height', fontsize=12)
ax[2].set_ylim(0, 5000)
ds["SNR"].T.plot(cmap='Spectral_r', vmin=-10, vmax=60, ax=ax[3])
ax[3].set_xlabel('Time [UTC]', fontsize=12), ax[3].set_ylabel('Height', fontsize=12)
ax[3].set_ylim(0, 5000)
date_form = DateFormatter("%Y-%m-%d \n %H:%M:%S")
plt.gca().xaxis.set_major_formatter(date_form)
fig.tight_layout()
ds.close()
fig.savefig(Path(name).stem + '.png', bbox_inches='tight', dpi=150)
plt.close(fig)
plugin.upload_file(Path(name).stem + '.png')
logging.debug("Uploading %s to beehive" % fi)
plugin.upload_file(name)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='mrrpro-plugin',
description='Plugin to transfer MRR-PRO data')
parser.add_argument('-n', '--num-files', default=1,
help='number of files to transfer (0 to transfer all data)')
parser.add_argument('-hr', '--hour', default=-1,
help='Hour of the day to transfer (-1 for all hours)')
args = parser.parse_args()
main(args)