-
Notifications
You must be signed in to change notification settings - Fork 3
/
api.py
155 lines (125 loc) · 5.42 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
""" Neuroscout API client"""
import requests
import re
import jwt
import os
from datetime import datetime
from functools import partialmethod
from . import API_BASE_URL, ROUTE_PATTERN
from . import endpoints
class Neuroscout(object):
"""Neuroscout API client object. This is the access point for the API."""
def __init__(self, email=None, password=None, api_base_url=None):
""" Initialize Neuroscout object.
:param email: Email address to use for authorization.
:type email: str, optional
:param password: Authentication password
:type password: str, optional
:param api_base_url: Alternate base URL for API (for debugging)
:type api_base_url: str, optional
"""
self._session = requests.Session()
self._api_base_url = api_base_url or API_BASE_URL
self._api_token = None
self._authorize(email, password)
# Set up main routes
self.analyses = endpoints.Analyses(self)
self.datasets = endpoints.Datasets(self)
self.tasks = endpoints.Tasks(self)
self.runs = endpoints.Runs(self)
self.predictors = endpoints.Predictors(self)
self.predictor_events = endpoints.PredictorEvents(self)
self.datasets = endpoints.Datasets(self)
self.user = endpoints.User(self)
def _get_headers(self):
""" Build authorization header """
if self._api_token is not None:
return {'Authorization': 'JWT %s' % self._api_token}
else:
return None
def _build_path(self, route, **kwargs):
""" Build and format a URI for a request.
Args:
route (str): Primary API route. E.g. 'user'
kwargs (dict): Dictionary of variables used to format URI.
Returns:
path (str): Formatted URI
"""
def _replace_variables(pattern, variables):
for name in re.findall("\{(.*?)\}", pattern):
if name in variables and variables[name] is not None:
di = {name: str(variables[name])}
pattern = pattern.format(**di)
else:
return ''
return pattern
new_path = ROUTE_PATTERN
optional_patterns = re.findall('\[(.*?)\]', ROUTE_PATTERN)
for pattern in optional_patterns:
chunk = _replace_variables(pattern, kwargs)
new_path = new_path.replace('[%s]' % pattern, chunk)
return new_path.format(base_url=self._api_base_url, route=route)
def _make_request(self, request, route, sub_route=None, id=None,
params=None, data=None,
json=None, headers=None, remove_null=True,
files=None, **kwargs):
""" Generic request handler """
if route != 'auth':
self._check_expiry()
request_function = getattr(self._session, request)
if remove_null is True:
kwargs = {k: v for (k, v) in kwargs.items() if v is not None}
if request == 'get':
params = kwargs
# Join lists as comma separated list
for k, v in params.items():
if isinstance(v, list):
v = [str(i) for i in v]
params[k] = ','.join(v)
elif request in ['put', 'post']:
if files:
data = kwargs
else:
json = kwargs
headers = headers or self._get_headers()
route = self._build_path(route, sub_route=sub_route, id=id)
resp = request_function(
route, json=json, data=data, files=files,
headers=headers, params=params)
if resp.headers.get('Content-Type') == 'application/json':
content = resp.json()
else:
content = resp.content
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as error:
if isinstance(content, dict):
if content.get('message') is not None:
error = str(error) + "\n Message: {}".format(
content['message'])
raise requests.exceptions.HTTPError(error)
return content
def _authorize(self, email=None, password=None):
""" Fetch api_token given access credentials """
if email is None and 'NEUROSCOUT_USER' in os.environ:
email = os.environ['NEUROSCOUT_USER']
if password is None and 'NEUROSCOUT_PASSWORD' in os.environ:
password = os.environ['NEUROSCOUT_PASSWORD']
if email is not None and password is not None:
rv = self._post('auth', email=email, password=password)
self._api_token = rv['access_token']
iat = datetime.utcfromtimestamp(
jwt.decode(self._api_token, verify=False,
options={'verify_signature': False})['iat'])
adj = iat - datetime.now()
self._api_token_exp = datetime.utcfromtimestamp(
jwt.decode(self._api_token, verify=False,
options={'verify_signature': False})['exp']) - adj
def _check_expiry(self):
if self._api_token is not None:
if datetime.now() > self._api_token_exp:
self._authorize()
_get = partialmethod(_make_request, 'get')
_post = partialmethod(_make_request, 'post')
_put = partialmethod(_make_request, 'put')
_delete = partialmethod(_make_request, 'delete')