/
iconik_metadata_list_builder.py
146 lines (118 loc) · 5.3 KB
/
iconik_metadata_list_builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import json
import sys
import requests
import argparse
import csv
import os
import unicodedata
import re
def parse_args():
# command line argument parsing
parser = argparse.ArgumentParser(description='Batch update values in iconik metadata fields')
parser.add_argument('-u', '--appId', dest='appId', metavar="appId", type=str, help="iconik AppID", required=True)
parser.add_argument('-s', '--authToken', dest='authToken', metavar="Auth Token", type=str, help="iconik Auth Token",
required=True)
parser.add_argument('-f', '--field', dest='field', metavar="FIELD", help="iconik metadata field key", required=True)
parser.add_argument('-a', '--address', dest='address', default='https://app.iconik.io',
help="iconik URL (default is https://app.iconik.io)")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-i', '--input-file', metavar="FILE_PATH", dest='input_file',
help="Key/Value input file (line delimited values or csv key/value pairs)")
group.add_argument('-o', '--output-file', metavar="OUTPUT_FILE_PATH", dest='output_file',
help='Key/Value output file (csv file)')
parser.add_argument('--debug', action='store_true')
return parser.parse_args()
# define which field types this script works on
safe_field_types = ['drop_down']
def slugify(s):
# slugify function to remove special characters from value strings
s = s.lower()
for c in [' ', '-', '.', '/']:
s = s.replace(c, '_')
s = re.sub('\W', '', s)
s = s.replace('_', ' ')
s = re.sub('\s+', ' ', s)
s = s.strip()
s = s.replace(' ', '-')
return s
def get_field_data(fieldname, session):
# get all information about a field
url = session.address + '/API/metadata/v1/fields/' + fieldname + '/'
r = session.get(url)
r.raise_for_status()
return r.json()
def write_file_values(output_file, values):
with open(output_file, 'w') as csvfile:
csvwriter = csv.DictWriter(csvfile, fieldnames=['value', 'label'], delimiter=',', quotechar='"')
csvwriter.writerows(values)
def get_file_values(input_file):
# return json object of all key/value pairs of a field for non lookups
if os.path.exists(input_file):
try:
with open(input_file, 'r') as csvfile:
options_obj = csv.reader(csvfile, delimiter=',', quotechar='"')
values = []
# this isn't a lookup, so we formulate JSON key/value pairs
for row in options_obj:
if len(row) == 2:
values.append({"value": slugify(row[0]), "label": row[1].rstrip()})
elif len(row) == 1:
values.append({"value": slugify(row[0]), "label": row[0].rstrip()})
else:
print("Too many options for CSV file, should have two per line")
exit()
except Exception as e:
print("Error trying to parse input file: " + str(e))
exit()
else:
print("File " + input_file + " doesn't exist")
exit()
return values
def main():
if sys.version_info.major < 3:
print("This script requires python 3 or greater. If you are running this on a portal system, "
"you can use /opt/cantemo/python/bin/python")
cli_args = parse_args()
if cli_args.debug:
import http.client
import logging
http.client.HTTPConnection.debuglevel = 1
# You must initialize logging, otherwise you'll not see debug output.
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
session = requests.Session()
session.headers.update({
'App-ID': cli_args.appId,
'Auth-Token': cli_args.authToken
})
session.address = cli_args.address
# get all the data about a field
field_data = get_field_data(cli_args.field, session)
# check if our initial call returned good data
if field_data is not None:
# check if our field is an acceptable field
if field_data['field_type'] in safe_field_types:
if cli_args.output_file:
write_file_values(cli_args.output_file, field_data['options'])
else:
# get back options for non lookup fields
new_values = get_file_values(cli_args.input_file) + field_data['options']
# add existing values to new text file values, sort, and remove duplicates
sorted_set = [dict(t) for t in set([tuple(sorted(d.items())) for d in new_values])]
# format data for posting back
new_field_data = {}
new_field_data['options'] = sorted_set
url = session.address + '/API/metadata/v1/fields/' + cli_args.field + '/'
r = session.patch(url, data=json.dumps(new_field_data))
r.raise_for_status()
else:
print("Can't use this field type with this script. Exiting.")
exit()
else:
print("Error finding field " + cli_args.field)
exit()
if __name__ == '__main__':
main()