/
app.py
executable file
·290 lines (220 loc) · 7.76 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
#!/usr/bin/env python
'''Web-based file differ.
For usage, see README.md.
'''
import logging
import mimetypes
import os
import requests
import socket
import sys
from threading import Timer
import time
import webbrowser
from flask import (Flask, render_template, send_from_directory, send_file,
request, jsonify, Response)
import diff
import util
import argparser
VERSION = '0.12.1'
def determine_path():
"""Borrowed from wxglade.py"""
try:
root = __file__
if os.path.islink (root):
root = os.path.realpath (root)
return os.path.dirname (os.path.abspath (root))
except:
print "I'm sorry, but something is wrong."
print "There is no __file__ variable. Please contact the author."
sys.exit()
def is_hot_reload():
"""In debug mode, Werkzeug reloads the app on any changes."""
return os.environ.get('WERKZEUG_RUN_MAIN')
class Config:
pass
#TESTING=True # not exactly sure what this does...
app = Flask(__name__)
app.config.from_object(Config)
app.config.from_envvar('WEBDIFF_CONFIG', silent=True)
DIFF = None
PORT = None
if app.config['TESTING'] or app.config['DEBUG']:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
app.logger.addHandler(handler)
for logname in ['']:
log = logging.getLogger(logname)
log.setLevel(logging.DEBUG)
log.addHandler(handler)
logging.getLogger('github').setLevel(logging.ERROR)
else:
# quiet down werkzeug -- no need to log every request.
logging.getLogger('werkzeug').setLevel(logging.ERROR)
LAST_REQUEST_MS = 0
@app.before_request
def update_last_request_ms():
global LAST_REQUEST_MS
LAST_REQUEST_MS = time.time() * 1000
def error(code, message):
e = {"code": code, "message": message}
response = jsonify(e)
response.status_code = 400
return response
@app.route("/<side>/get_contents", methods=['POST'])
def get_contents(side):
if side not in ('a', 'b'):
return error('invalid side', 'Side must be "a" or "b", got %s' % side)
# TODO: switch to index? might be simpler
path = request.form.get('path', '')
if not path:
return error('incomplete', 'Incomplete request (need path)')
idx = diff.find_diff_index(DIFF, side, path)
if idx is None:
return error('not found', 'Invalid path on side %s: %s' % (side, path))
d = DIFF[idx]
abs_path = d.a_path if side == 'a' else d.b_path
try:
is_binary = util.is_binary_file(abs_path)
if is_binary:
size = os.path.getsize(abs_path)
contents = "Binary file (%d bytes)" % size
else:
contents = open(abs_path).read()
return Response(contents, mimetype='text/plain')
except Exception:
return error('read-error', 'Unable to read %s' % abs_path)
@app.route("/<side>/image/<path:path>")
def get_image(side, path):
if side not in ('a', 'b'):
return error('invalid side', 'Side must be "a" or "b", got %s' % side)
# TODO: switch to index? might be simpler
if not path:
return error('incomplete', 'Incomplete request (need path)')
mime_type, enc = mimetypes.guess_type(path)
if not mime_type or not mime_type.startswith('image/') or enc is not None:
return error('wrongtype', 'Requested file of type (%s, %s) as image' % (
mime_type, enc))
idx = diff.find_diff_index(DIFF, side, path)
if idx is None:
return error('not found', 'Invalid path on side %s: %s' % (side, path))
d = DIFF[idx]
abs_path = d.a_path if side == 'a' else d.b_path
try:
contents = open(abs_path).read()
return Response(contents, mimetype=mime_type)
except Exception:
return error('read-error', 'Unable to read %s' % abs_path)
@app.route("/pdiff/<int:idx>")
def get_pdiff(idx):
idx = int(idx)
d = DIFF[idx]
try:
_, pdiff_image = util.generate_pdiff_image(d.a_path, d.b_path)
dilated_image = util.generate_dilated_pdiff_image(pdiff_image)
except util.ImageMagickNotAvailableError:
return 'ImageMagick is not available', 501
except util.ImageMagickError as e:
return 'ImageMagick error %s' % e, 501
return send_file(dilated_image)
@app.route("/pdiffbbox/<int:idx>")
def get_pdiff_bbox(idx):
idx = int(idx)
d = DIFF[idx]
try:
_, pdiff_image = util.generate_pdiff_image(d.a_path, d.b_path)
bbox = util.get_pdiff_bbox(pdiff_image)
except util.ImageMagickNotAvailableError:
return 'ImageMagick is not available', 501
except util.ImageMagickError as e:
return 'ImageMagick error %s' % e, 501
return jsonify(bbox)
# Show the first diff by default
@app.route("/")
def index():
return file_diff('0')
@app.route("/<int:idx>")
def file_diff(idx):
idx = int(idx)
pairs = diff.get_thin_list(DIFF)
return render_template('file_diff.html',
idx=idx,
has_magick=util.is_imagemagick_available(),
pairs=pairs)
@app.route('/thick/<int:idx>')
def thick_diff(idx):
idx = int(idx)
return jsonify(diff.get_thick_dict(DIFF[idx]))
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.root_path, 'static/img'),
'favicon.ico',
mimetype='image/vnd.microsoft.icon')
@app.route('/seriouslykill', methods=['POST'])
def seriouslykill():
func = request.environ.get('werkzeug.server.shutdown')
if func is None:
raise RuntimeError('Not running with the Werkzeug Server')
func()
return "Shutting down..."
@app.route('/kill', methods=['POST'])
def kill():
if 'STAY_RUNNING' in app.config:
return 'Will stay running.'
last_ms = LAST_REQUEST_MS
def shutdown():
if LAST_REQUEST_MS <= last_ms: # subsequent requests abort shutdown
requests.post('http://localhost:%d/seriouslykill' % PORT)
else:
pass
Timer(0.5, shutdown).start()
return 'Shutting down...'
def open_browser():
global PORT
if not 'NO_OPEN_BROWSER' in app.config:
if is_hot_reload():
log.debug('Skipping browser open on reload')
else:
webbrowser.open_new_tab('http://localhost:%s' % PORT)
def usage_and_die():
sys.stderr.write(argparser.USAGE)
sys.exit(1)
def pick_a_port(args):
if 'port' in args != -1:
return args['port']
if os.environ.get('WEBDIFF_PORT'):
return int(os.environ.get('WEBDIFF_PORT'))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
sock.close()
return port
def abs_path(path):
'''Changes relative paths to be abs w/r/t/ the original cwd.'''
if os.path.isabs(path):
return path
else:
return os.path.join(os.getcwd(), path)
def is_webdiff_from_head():
'''Was webdiff invoked as `git webdiff` with no other non-flag args?'''
return os.environ.get('WEBDIFF_FROM_HEAD') != None
def run():
global DIFF, PORT
try:
parsed_args = argparser.parse(sys.argv[1:], VERSION)
except argparser.UsageError as e:
sys.stderr.write('Error: %s\n\n' % e.message)
usage_and_die()
DIFF = argparser.diff_for_args(parsed_args)
if app.config['TESTING'] or app.config['DEBUG']:
sys.stderr.write('Diff:\n%s' % DIFF)
PORT = pick_a_port(parsed_args)
sys.stderr.write('''Serving diffs on http://localhost:%s
Close the browser tab or hit Ctrl-C when you're done.
''' % PORT)
Timer(0.1, open_browser).start()
app.run(host='0.0.0.0', port=PORT)
if __name__ == '__main__':
run()