/
login_middleware.py
158 lines (135 loc) · 5.72 KB
/
login_middleware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from django.conf import settings
from django.contrib.auth import login as django_login
from django.template.loader import render_to_string
from django.http import HttpResponse
from django.contrib.auth import authenticate
from django.contrib.auth.models import User
from django.shortcuts import redirect
from django.http import HttpResponseForbidden
import re
import os
import logging
import subprocess
logger = logging.getLogger("NTA-Login-Logger")
logger.setLevel(logging.INFO)
PROJECT_ROOT = os.path.abspath(os.path.dirname(__file__))
class Http403Middleware(object):
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
if HttpResponseForbidden.status_code == response.status_code:
logger.info("User login session token timed out")
return login(request, "<span style='color:red;'>Your session has timed out, please log back in to refresh your session.</span>")
else:
return response
def login(request):
next_page = request.GET.get('next')
message = ""
delete_message = False
if "message" in request.COOKIES.keys():
message = request.COOKIES["message"]
delete_message = True
html = render_to_string('login_prompt.html', {
'TITLE': 'User Login', 'next': next_page, 'TEXT': message
}, request=request)
response = HttpResponse()
response.write(html)
if delete_message:
response.delete_cookie('message')
return response
class RequireLoginMiddleware:
def __init__(self, get_response):
self.login_verbose = settings.LOGIN_VERBOSE
self.login_duration = settings.LOGIN_DURATION
if not settings.LOGIN_REQUIRED:
return
self.get_response = get_response
self.login_url = re.compile(settings.LOGIN_URL)
self.nta_username = "ntauser"
self.open_urls = [
'/nta/login'
]
nta_password = self.load_password()
if nta_password is None:
logger.warn("NTA login password as not set.")
return
try:
if not User.objects.filter(username=self.nta_username).exists():
_user = User.objects.create_user(self.nta_username, 'nta@nta.nta', nta_password)
_user.save()
except Exception:
logger.warn(f"User: {self.nta_username} already exists")
def __call__(self, request):
response = self.get_response(request)
return response
def load_password(self):
nta_password = os.getenv('NTA_PASSWORD')
if nta_password is None:
if self.login_verbose:
logger.info(f"Unable to get password as NTA_PASSWORD env was not set.")
return None
reset_env = ""
if os.name == 'posix':
reset_env = 'export NTA_PASSWORD=THEANSWERIS42'
elif os.name == 'nt':
reset_env = 'setx NTA_PASSWORD THEANSWERIS42'
subprocess.Popen(reset_env, shell=True).wait()
return nta_password
def login_auth(self, request):
username = request.POST.get('username')
password = request.POST.get('password')
next_page = request.POST.get('next')
# Redirect back to login page if username is invalid
if username != self.nta_username:
if self.login_verbose:
logger.info(f"Login Auth: Username is invalid. Provided username: {username}")
response = redirect('/nta/login?next={}'.format(next_page))
response.set_cookie('message', "Username is not correct.")
return response
user = authenticate(username=username, password=password)
session_duration = self.login_duration
if user is not None:
if user.is_active:
if self.login_verbose:
logger.info(f"User login successful, redirecting to {next_page}, session will expire in {session_duration} secs.")
request.session.set_expiry(session_duration)
django_login(request, user)
return redirect(next_page)
else:
if self.login_verbose:
logger.info(f"User no longer active, must log back in.")
response = redirect('/nta/login?next={}'.format(next_page))
response.set_cookie('message', "Session has ended, must log back in.")
return response
else:
if self.login_verbose:
logger.info(f"User not logged in.")
response = redirect('/nta/login?next={}'.format(next_page))
response.set_cookie('message', "Password is not correct.")
return response
def process_view(self, request, view_func, view_args, view_kwargs):
assert hasattr(request, 'user')
path = request.path
redirect_path = request.POST.get('next', "")
token = request.GET.get('token')
user = request.user
if self.login_verbose:
logger.info(f"Process view; user: {user}, next: {redirect_path}, token: {token}")
if request.POST and self.login_url.match(path):
return self.login_auth(request)
elif not user.is_authenticated:
if self.open_url_check(path=path):
if self.login_verbose:
logger.info(f"Process view cleared open url for path: {path}")
return
else:
return redirect('/nta/login?next={}'.format(path))
elif user.is_authenticated:
return
else:
return redirect('/nta/login?next={}'.format(path))
def open_url_check(self, path):
if any(p in path for p in self.open_urls):
return True
return False