/
update
executable file
·279 lines (246 loc) · 8.29 KB
/
update
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
#!/usr/bin/env python3
"""
Makes a request to the Github API and compares
the list of repos from the API to cached information
"""
import os
import json
import time
from itertools import chain
from typing import List, Dict, Set, Any, Tuple, Optional
from pathlib import Path
import toml
import requests
import click
Json = Any
RepoDB = Dict[str, Json]
this_dir: Path = Path(__file__).absolute().parent
rate_limit_remaining: List[int] = []
def cached_response_has_expired(*, cachefile: Path) -> bool:
"""
Returns T/F which determines whether or not
to download fresh data from Github API
"""
if not cachefile.exists():
click.echo("Cache file doesn't exist...", err=True)
return True
if time.time() - cachefile.stat().st_mtime > 60 * 30:
click.echo("Cache file has expired, re-downloading...", err=True)
return True
else:
click.echo("Using cached data...", err=True)
return False
def _save_rate_limit(resp: requests.Response) -> Optional[int]:
if "X-RateLimit-Remaining" in resp.headers:
try:
rl = int(resp.headers["X-RateLimit-Remaining"])
rate_limit_remaining.append(rl)
return rl
except ValueError:
pass
# downloads new repository data if needed,
# returns the parsed JSON data
def get_repository_data(
*, cachefile: Path, github_username: str, unowned: List[str]
) -> List[Json]:
if not cached_response_has_expired(cachefile=cachefile):
try:
data = json.loads(cachefile.read_text())
assert isinstance(data, list)
return data
except json.decoder.JSONDecodeError:
click.echo("Failed to load JSON from cache file...", err=True)
click.echo(
f"Downloading repository information for {github_username}...",
err=True,
)
# loop through the paginated responses to get all repos
repo_info = []
page: int = 1
while True:
url = f"https://api.github.com/users/{github_username}/repos?page={page}"
click.echo(f"Requesting {url}", err=True)
resp = requests.get(
url,
headers={"Accept": "application/vnd.github.full+json"},
)
resp.raise_for_status()
if ratelimit_left := _save_rate_limit(resp):
if ratelimit_left <= 1:
click.echo("Rate limit reached, exiting...", err=True)
exit(1)
resp_json = resp.json()
if resp_json:
repo_info.extend(resp_json)
else:
break
page += 1
for other_repo in unowned:
url = f"https://api.github.com/repos/{other_repo}"
click.echo(f"Requesting {url}", err=True)
resp = requests.get(
url,
headers={"Accept": "application/vnd.github.full+json"},
)
resp.raise_for_status()
_save_rate_limit(resp)
repo_info.append(resp.json())
with cachefile.open("w") as jf:
json.dump(repo_info, jf, indent=4)
return repo_info
# return all data from the data.toml file
def load_data(*, datafile: Path) -> Tuple[Set[str], RepoDB]:
repos: Dict[str, Json] = {}
ignored: Set[str] = set()
if not datafile.exists():
return ignored, repos
with datafile.open("r") as df:
data = toml.load(df)
for key, val in data.items():
if key == "ignored":
ignored = set(val)
else:
repos[key] = val
return ignored, repos
TOPICS: Set[str] = set()
def _prompt_tags(name: str) -> List[str]:
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter
click.echo("(Remember to add language tag if makes sense to do so)", err=True)
tags = []
comp = WordCompleter(list(TOPICS))
resp = prompt(f"{name}; add tag: ", completer=comp).strip()
if resp:
tags.extend([r.strip() for r in resp.split()])
return tags
def _sort_tags(tags: List[str]) -> List[str]:
return list(sorted(set(tags)))
# priority:
# 1: put at bottom
# 2: order by star
# 3: at the top
#
# score:
# acts as a buffer for stars
# if I want something to appear slightly higher up
# when calculating order on priority 2, it compares
# score + stars
# prompt me to add any new items
def prompt_new(repo_info: List[Json], repo_data: RepoDB, ignored: Set[str]) -> RepoDB:
for info in repo_info:
for topic in info.get("topics", []):
if topic:
TOPICS.add(topic)
if info["language"]:
TOPICS.add(info["language"])
for rinfo in repo_data.values():
for tag in rinfo.get("tags", []):
if tag:
TOPICS.add(tag)
prompted: int = 0
for repo in repo_info:
rname: str = repo["full_name"]
if not repo["private"] and rname not in ignored:
if rname not in repo_data:
if "PROJECTS_BG_UPDATE" in os.environ:
continue
click.echo(f"Processing {rname}...", err=True)
prompted += 1
if not click.confirm(f"Ignore '{rname}'?"):
repo_desc = (
click.edit(text=repo["description"]) or repo["description"]
)
assert repo_desc is not None
tags = _sort_tags(_prompt_tags(name=repo["full_name"]))
new_repo_data = {
"name": repo["name"],
"full_name": repo["full_name"],
"html_url": repo["html_url"],
"description": repo_desc.strip(),
"updated_at": repo["updated_at"],
"tags": tags,
"language": repo["language"],
"priority": 2,
"score": 0,
}
if click.confirm("Add URL?"):
new_repo_data["url"] = click.prompt("URL ")
if click.confirm("Add Image?"):
new_repo_data["img"] = click.prompt("Image ")
repo_data[rname] = new_repo_data
else:
ignored.add(rname)
if prompted >= 5:
break
repo_data["ignored"] = sorted(ignored)
return repo_data
def classify_tags(tag_data: Dict[str, bool], *, tags: Set[str]) -> Dict[str, bool]:
for tag in tags:
if tag not in tag_data:
click.echo(f"Is {tag} a language? [y/n] ", nl=False)
ch = click.getchar()
tag_data[tag] = ch == "y"
click.echo()
return tag_data
@click.command(help=__doc__)
@click.option(
"-c",
"--cachefile",
"use_cachefile",
default=this_dir / "cache.json",
type=click.Path(path_type=Path),
help="JSON cachefile to use",
show_default=True,
)
@click.option(
"-d",
"--datafile",
"use_datafile",
default=this_dir / "data.toml",
type=click.Path(path_type=Path),
help="TOML datafile to use",
show_default=True,
)
@click.option(
"-L",
"--lang-file",
default=this_dir / "languages.json",
type=click.Path(path_type=Path),
help="store classified tags for languages",
show_default=True,
)
@click.option(
"-g",
"--github-username",
"use_github_username",
default="seanbreckenridge",
help="github username to use",
)
def main(
use_cachefile: Path, use_datafile: Path, lang_file: Path, use_github_username: str
) -> None:
ignored, repo_data = load_data(datafile=use_datafile)
repo_info: List[Dict] = get_repository_data(
cachefile=use_cachefile,
github_username=use_github_username,
unowned=[r for r in repo_data if not r.startswith(use_github_username)],
)
repo_data = prompt_new(repo_info, repo_data, ignored)
with use_datafile.open("w") as tf:
toml.dump(repo_data, tf)
tag_data = {}
if lang_file.exists():
tag_data = json.loads(lang_file.read_text())
tag_data = classify_tags(
tag_data,
tags=set(
chain(*[r["tags"] for r in repo_data.values() if isinstance(r, dict)])
),
)
lang_file.write_text(json.dumps(tag_data, indent=4, sort_keys=True))
if rate_limit_remaining:
click.echo(
f"Rate limit remaining this hour: {rate_limit_remaining[-1]}", err=True
)
if __name__ == "__main__":
main()