From 0eba3b7857e61f864d38b5e3cfa13344445963f8 Mon Sep 17 00:00:00 2001 From: Patrik Dufresne Date: Mon, 18 Oct 2021 10:27:33 -0400 Subject: [PATCH] Add CSRF to user, ssh and repo management --- rdiffweb/controller/cherrypy_wtf.py | 42 ++++- rdiffweb/controller/page_admin.py | 153 ++++++++++-------- rdiffweb/controller/page_delete.py | 29 ++-- rdiffweb/controller/pref_sshkeys.py | 61 ++++--- rdiffweb/controller/tests/test_page_admin.py | 54 +++++-- rdiffweb/controller/tests/test_page_delete.py | 22 ++- .../controller/tests/test_page_prefs_ssh.py | 51 ++++-- rdiffweb/templates/admin_users.html | 5 +- rdiffweb/templates/prefs_sshkeys.html | 9 +- rdiffweb/test.py | 14 +- 10 files changed, 309 insertions(+), 131 deletions(-) diff --git a/rdiffweb/controller/cherrypy_wtf.py b/rdiffweb/controller/cherrypy_wtf.py index 1b83ace5..81dd243f 100644 --- a/rdiffweb/controller/cherrypy_wtf.py +++ b/rdiffweb/controller/cherrypy_wtf.py @@ -1,5 +1,26 @@ -from wtforms.form import Form +# -*- coding: utf-8 -*- +# rdiffweb, A web interface to rdiff-backup repositories +# Copyright (C) 2012-2021 rdiffweb contributors +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +from datetime import timedelta + import cherrypy +from wtforms.csrf.session import SessionCSRF +from wtforms.form import Form SUBMIT_METHODS = {'POST', 'PUT', 'PATCH', 'DELETE'} @@ -26,14 +47,26 @@ def getlist(self, key): _AUTO = _ProxyFormdata() +# Provide a default secret +_CSRF_SECRET = os.environ.get('RDIFFWEB_CSRF_SECRET', 'b5WPUOw3Zd7gr1ligLilAAUA9zuWkW41') + class CherryForm(Form): """ Custom implementation of WTForm for cherrypy to support kwargs parms. - + If ``formdata`` is not specified, this will use cherrypy.request.params Explicitly pass ``formdata=None`` to prevent this. """ + class Meta: + csrf = True + csrf_class = SessionCSRF + csrf_secret = _CSRF_SECRET.encode('utf-8') + csrf_time_limit = timedelta(minutes=30) + + @property + def csrf_context(self): + return cherrypy.session def __init__(self, formdata=_AUTO, **kwargs): super().__init__(formdata=formdata, **kwargs) @@ -51,3 +84,8 @@ def validate_on_submit(self): This is a shortcut for ``form.is_submitted() and form.validate()``. """ return self.is_submitted() and self.validate() + + @property + def error_message(self): + if self.errors: + return ' '.join(['%s: %s' % (field, ', '.join(messages)) for field, messages in self.errors.items()]) diff --git a/rdiffweb/controller/page_admin.py b/rdiffweb/controller/page_admin.py index 058e77d0..d914ef61 100644 --- a/rdiffweb/controller/page_admin.py +++ b/rdiffweb/controller/page_admin.py @@ -22,22 +22,21 @@ import pwd import subprocess import sys +from collections import OrderedDict import cherrypy import humanfriendly import psutil -from wtforms import validators, widgets -from wtforms.fields.core import StringField, SelectField, Field -from wtforms.fields.html5 import EmailField -from wtforms.fields.simple import PasswordField - from rdiffweb.controller import Controller, flash from rdiffweb.controller.cherrypy_wtf import CherryForm from rdiffweb.core.config import Option from rdiffweb.core.i18n import ugettext as _ from rdiffweb.core.quota import QuotaUnsupported from rdiffweb.core.store import ADMIN_ROLE, MAINTAINER_ROLE, USER_ROLE -from collections import OrderedDict +from wtforms import validators, widgets +from wtforms.fields.core import Field, SelectField, StringField +from wtforms.fields.html5 import EmailField +from wtforms.fields.simple import PasswordField # Define the logger logger = logging.getLogger(__name__) @@ -49,12 +48,18 @@ def get_pyinfo(): yield _('OS Version'), '%s %s (%s %s)' % (platform.system(), platform.release(), distro.linux_distribution()[0].capitalize(), distro.linux_distribution()[1]) except: yield _('OS Version'), '%s %s' % (platform.system(), platform.release()) - if hasattr(os, 'path'): yield _('OS Path'), os.environ['PATH'] - if hasattr(sys, 'version'): yield _('Python Version'), ''.join(sys.version) - if hasattr(sys, 'subversion'): yield _('Python Subversion'), ', '.join(sys.subversion) - if hasattr(sys, 'prefix'): yield _('Python Prefix'), sys.prefix - if hasattr(sys, 'executable'): yield _('Python Executable'), sys.executable - if hasattr(sys, 'path'): yield _('Python Path'), ', '.join(sys.path) + if hasattr(os, 'path'): + yield _('OS Path'), os.environ['PATH'] + if hasattr(sys, 'version'): + yield _('Python Version'), ''.join(sys.version) + if hasattr(sys, 'subversion'): + yield _('Python Subversion'), ', '.join(sys.subversion) + if hasattr(sys, 'prefix'): + yield _('Python Prefix'), sys.prefix + if hasattr(sys, 'executable'): + yield _('Python Executable'), sys.executable + if hasattr(sys, 'path'): + yield _('Python Path'), ', '.join(sys.path) def get_osinfo(): @@ -71,7 +76,8 @@ def pw_name(uid): except: return - if hasattr(sys, 'getfilesystemencoding'): yield _('File System Encoding'), sys.getfilesystemencoding() + if hasattr(sys, 'getfilesystemencoding'): + yield _('File System Encoding'), sys.getfilesystemencoding() if hasattr(os, 'getcwd'): yield _('Current Working Directory'), os.getcwd() if hasattr(os, 'getegid'): @@ -167,10 +173,9 @@ class UserForm(CherryForm): validators=[validators.optional()], description=_("Disk spaces (in bytes) used by this user.")) - @property - def error_message(self): - if self.errors: - return ' '.join(['%s: %s' % (field, ', '.join(messages)) for field, messages in self.errors.items()]) + +class DeleteUserForm(CherryForm): + username = StringField(_('Username'), validators=[validators.required()]) @cherrypy.tools.is_admin() @@ -180,6 +185,67 @@ class AdminPage(Controller): logfile = Option('log_file') logaccessfile = Option('log_access_file') + def _add_or_edit_user(self, action, form): + assert action in ['add', 'edit'] + assert form + # Validate form. + if not form.validate(): + flash(form.error_message, level='error') + return + if action == 'add': + user = self.app.store.add_user(form.username.data, form.password.data) + else: + user = self.app.store.get_user(form.username.data) + if form.password.data: + user.set_password(form.password.data, old_password=None) + # Don't allow the user to changes it's "role" state. + if form.username.data != self.app.currentuser.username: + user.role = form.role.data + user.email = form.email.data or '' + if form.user_root.data: + user.user_root = form.user_root.data + if not user.valid_user_root(): + flash(_("User's root directory %s is not accessible!") % user.user_root, level='error') + logger.warning("user's root directory %s is not accessible" % user.user_root) + # Try to update disk quota if the human readable value changed. + # Report error using flash. + if form.disk_quota and form.disk_quota.data: + try: + new_quota = form.disk_quota.data + old_quota = humanfriendly.parse_size(humanfriendly.format_size(user.disk_quota, binary=True)) + if old_quota != new_quota: + user.disk_quota = new_quota + flash(_("User's quota updated"), level='success') + except QuotaUnsupported as e: + flash(_("Setting user's quota is not supported"), level='warning') + except Exception as e: + flash(_("Failed to update user's quota: %s") % e, level='error') + logger.warning("failed to update user's quota", exc_info=1) + if action == 'add': + flash(_("User added successfully.")) + else: + flash(_("User information modified successfully.")) + + def _delete_user(self, action, form): + assert action == 'delete' + assert form + # Validate form. + if not form.validate(): + flash(form.error_message, level='error') + return + if form.username.data == self.app.currentuser.username: + flash(_("You cannot remove your own account!"), level='error') + else: + try: + user = self.app.store.get_user(form.username.data) + if user: + user.delete() + flash(_("User account removed.")) + else: + flash(_("User doesn't exists!"), level='warning') + except ValueError as e: + flash(e, level='error') + def _get_log_files(self): """ Return a list of log files to be shown in admin area. @@ -226,57 +292,10 @@ def users(self, criteria=u"", search=u"", action=u"", **kwargs): # If we're just showing the initial page, just do that form = UserForm() - if action in ["add", "edit"] and not form.validate(): - # Display the form error to user using flash - flash(form.error_message, level='error') - - elif action in ["add", "edit"] and form.validate(): - if action == 'add': - user = self.app.store.add_user(form.username.data, form.password.data) - else: - user = self.app.store.get_user(form.username.data) - if form.password.data: - user.set_password(form.password.data, old_password=None) - # Don't allow the user to changes it's "role" state. - if form.username.data != self.app.currentuser.username: - user.role = form.role.data - user.email = form.email.data or '' - if form.user_root.data: - user.user_root = form.user_root.data - if not user.valid_user_root(): - flash(_("User's root directory %s is not accessible!") % user.user_root, level='error') - logger.warning("user's root directory %s is not accessible" % user.user_root) - # Try to update disk quota if the human readable value changed. - # Report error using flash. - if form.disk_quota and form.disk_quota.data: - try: - new_quota = form.disk_quota.data - old_quota = humanfriendly.parse_size(humanfriendly.format_size(user.disk_quota, binary=True)) - if old_quota != new_quota: - user.disk_quota = new_quota - flash(_("User's quota updated"), level='success') - except QuotaUnsupported as e: - flash(_("Setting user's quota is not supported"), level='warning') - except Exception as e: - flash(_("Failed to update user's quota: %s") % e, level='error') - logger.warning("failed to update user's quota", exc_info=1) - if action == 'add': - flash(_("User added successfully.")) - else: - flash(_("User information modified successfully.")) + if action in ["add", "edit"]: + self._add_or_edit_user(action, form) elif action == 'delete': - if form.username.data == self.app.currentuser.username: - flash(_("You cannot remove your own account!"), level='error') - else: - try: - user = self.app.store.get_user(form.username.data) - if user: - user.delete() - flash(_("User account removed.")) - else: - flash(_("User doesn't exists!"), level='warning') - except ValueError as e: - flash(e, level='error') + self._delete_user(action, DeleteUserForm()) params = { "form": UserForm(formdata=None), diff --git a/rdiffweb/controller/page_delete.py b/rdiffweb/controller/page_delete.py index 5e0663ba..e9af5127 100644 --- a/rdiffweb/controller/page_delete.py +++ b/rdiffweb/controller/page_delete.py @@ -24,31 +24,42 @@ import logging import cherrypy -from rdiffweb.controller import Controller, validate +from rdiffweb.controller import Controller, flash +from rdiffweb.controller.cherrypy_wtf import CherryForm from rdiffweb.controller.dispatch import poppath from rdiffweb.controller.filter_authorization import is_maintainer - +from rdiffweb.core.i18n import ugettext as _ +from wtforms import validators +from wtforms.fields.core import StringField _logger = logging.getLogger(__name__) +class DeleteRepoForm(CherryForm): + confirm = StringField(_('Confirmation'), validators=[validators.required()]) + redirect = StringField(default='/') + + @poppath() class DeletePage(Controller): @cherrypy.expose - def default(self, path=b"", confirm=None, redirect='/', **kwargs): + def default(self, path=b"", **kwargs): # Check permissions on path/repo unused, path_obj = self.app.store.get_repo_path(path) # Check user's permissions is_maintainer() + # validate form + form = DeleteRepoForm() + if not form.validate(): + raise cherrypy.HTTPError(400, form.error_message) + # Validate the name - validate(confirm) - if confirm != path_obj.display_name: - _logger.info("do not delete repo, bad confirmation %r != %r", confirm, path_obj.display_name) - raise cherrypy.HTTPError(400) + if form.confirm.data != path_obj.display_name: + _logger.info("do not delete repo, bad confirmation %r != %r", form.confirm.data, path_obj.display_name) + raise cherrypy.HTTPError(400, 'bad confirmation') # Delete repository self.app.scheduler.add_task(path_obj.delete, args=[]) - - raise cherrypy.HTTPRedirect(redirect) + raise cherrypy.HTTPRedirect(form.redirect.data) diff --git a/rdiffweb/controller/pref_sshkeys.py b/rdiffweb/controller/pref_sshkeys.py index faaff885..4900a4f4 100644 --- a/rdiffweb/controller/pref_sshkeys.py +++ b/rdiffweb/controller/pref_sshkeys.py @@ -47,7 +47,7 @@ def validate_key(unused_form, field): raise ValidationError(_("Invalid SSH key.")) -class SSHForm(CherryForm): +class SshForm(CherryForm): title = StringField( _('Title'), description=_('The title is an optional description to identify the key. e.g.: bob@thinkpad-t530'), @@ -60,6 +60,10 @@ class SSHForm(CherryForm): fingerprint = StringField('Fingerprint') +class DeleteSshForm(CherryForm): + fingerprint = StringField('Fingerprint') + + class SSHKeysPlugin(Controller): """ Plugin to configure SSH keys. @@ -69,30 +73,45 @@ class SSHKeysPlugin(Controller): panel_name = _('SSH Keys') + def _add_key(self, action, form): + assert action == 'add' + assert form + if not form.validate(): + for unused, messages in form.errors.items(): + for message in messages: + flash(message, level='warning') + return + try: + self.app.currentuser.add_authorizedkey(key=form.key.data, comment=form.title.data) + except DuplicateSSHKeyError as e: + flash(str(e), level='error') + except: + flash(_("Unknown error while adding the SSH Key"), level='error') + _logger.warning("error adding ssh key", exc_info=1) + + def _delete_key(self, action, form): + assert action == 'delete' + assert form + if not form.validate(): + for unused, messages in form.errors.items(): + for message in messages: + flash(message, level='warning') + return + is_maintainer() + try: + self.app.currentuser.delete_authorizedkey(form.fingerprint.data) + except: + flash(_("Unknown error while removing the SSH Key"), level='error') + _logger.warning("error removing ssh key", exc_info=1) + def render_prefs_panel(self, panelid, action=None, **kwargs): # @UnusedVariable # Handle action - form = SSHForm() - if action == "add" and not form.validate(): - for unused_field, messages in form.errors.items(): - for message in messages: - flash(message, level='warning') - elif action == 'add': - # Add the key to the current user. - try: - self.app.currentuser.add_authorizedkey(key=form.key.data, comment=form.title.data) - except DuplicateSSHKeyError as e: - flash(str(e), level='error') - except: - flash(_("Unknown error while adding the SSH Key"), level='error') - _logger.warning("error adding ssh key", exc_info=1) + form = SshForm() + if action == "add": + self._add_key(action, form) elif action == 'delete': - is_maintainer() - try: - self.app.currentuser.delete_authorizedkey(form.fingerprint.data) - except: - flash(_("Unknown error while removing the SSH Key"), level='error') - _logger.warning("error removing ssh key", exc_info=1) + self._delete_key(action, DeleteSshForm()) # Get SSH keys if file exists. params = { diff --git a/rdiffweb/controller/tests/test_page_admin.py b/rdiffweb/controller/tests/test_page_admin.py index f4eef7d4..8d3e07e6 100644 --- a/rdiffweb/controller/tests/test_page_admin.py +++ b/rdiffweb/controller/tests/test_page_admin.py @@ -20,9 +20,7 @@ @author: Patrik Dufresne """ -import logging import os -import unittest from unittest.mock import ANY, MagicMock import rdiffweb.test @@ -33,7 +31,7 @@ class AbstractAdminTest(rdiffweb.test.WebCase): """Class to regroup command method to test admin page.""" - def _add_user(self, username=None, email=None, password=None, user_root=None, role=None): + def _add_user(self, username=None, email=None, password=None, user_root=None, role=None, csrf_token=None): b = {} b['action'] = 'add' if username is not None: @@ -46,9 +44,10 @@ def _add_user(self, username=None, email=None, password=None, user_root=None, ro b['user_root'] = user_root if role is not None: b['role'] = str(role) + b['csrf_token'] = csrf_token or self.getCsrfToken("/admin/users/") self.getPage("/admin/users/", method='POST', body=b) - def _edit_user(self, username=None, email=None, password=None, user_root=None, role=None, disk_quota=None): + def _edit_user(self, username=None, email=None, password=None, user_root=None, role=None, disk_quota=None, csrf_token=None): b = {} b['action'] = 'edit' if username is not None: @@ -63,11 +62,13 @@ def _edit_user(self, username=None, email=None, password=None, user_root=None, r b['role'] = str(role) if disk_quota is not None: b['disk_quota'] = disk_quota + b['csrf_token'] = csrf_token or self.getCsrfToken("/admin/users/") self.getPage("/admin/users/", method='POST', body=b) - def _delete_user(self, username='test1'): + def _delete_user(self, username='test1', csrf_token=None): b = {'action': 'delete', 'username': username} + b['csrf_token'] = csrf_token or self.getCsrfToken("/admin/users/") self.getPage("/admin/users/", method='POST', body=b) @@ -76,15 +77,50 @@ class AdminUsersAsAdminTest(AbstractAdminTest): login = True + def test_add_user_without_csrf(self): + # Given I'm logging as Admin + # When trying add a user with invalid csrf token + self._add_user("admin_role", "admin_role@test.com", "test2", "/home/", ADMIN_ROLE, csrf_token='1234567890##invalid') + # Then the form is refused + self.assertStatus(200) + self.assertInBody('CSRF failed') + # Then user is not created + self.assertIsNone(self.app.store.get_user('admin_role')) + + def test_edit_user_without_csrf(self): + # Given an existing user + self._add_user("myuser", "myuser@test.com", "test2", "/home/", MAINTAINER_ROLE) + # When trying edit a user with invalid csrf token + self._edit_user(username='myuser', email="newemail@test.com", csrf_token='1234567890##invalid') + # Then the form is refused + self.assertStatus(200) + self.assertInBody('CSRF failed') + # Then user is not updated + self.assertEqual("myuser@test.com", self.app.store.get_user('myuser').email) + + def test_delete_user_without_csrf(self): + # Given an existing user + self._add_user("myuser", "myuser@test.com", "test2", "/home/", MAINTAINER_ROLE) + # When trying edit a user with invalid csrf token + self._delete_user(username='myuser', csrf_token='1234567890##invalid') + # Then the form is refused + self.assertStatus(200) + self.assertInBody('CSRF failed') + # Then user is not delete + self.assertIsNotNone(self.app.store.get_user('myuser').email) + def test_add_user_with_role(self): # Add user to be listed self._add_user("admin_role", "admin_role@test.com", "test2", "/home/", ADMIN_ROLE) + self.assertStatus(200) self.assertEqual(ADMIN_ROLE, self.app.store.get_user('admin_role').role) self._add_user("maintainer_role", "maintainer_role@test.com", "test2", "/home/", MAINTAINER_ROLE) + self.assertStatus(200) self.assertEqual(MAINTAINER_ROLE, self.app.store.get_user('maintainer_role').role) self._add_user("user_role", "user_role@test.com", "test2", "/home/", USER_ROLE) + self.assertStatus(200) self.assertEqual(USER_ROLE, self.app.store.get_user('user_role').role) def test_add_user_with_invalid_role(self): @@ -202,7 +238,7 @@ def test_delete_user_admin(self): Verify failure to delete our self. """ # Create another admin user - self._add_user('admin2', '', 'password', '' , ADMIN_ROLE) + self._add_user('admin2', '', 'password', '', ADMIN_ROLE) self.getPage("/logout/") self._login('admin2', 'password') @@ -348,21 +384,21 @@ def test_add_user(self): """ Check if adding user is forbidden. """ - self._add_user("test2", "test2@test.com", "test2", "/tmp/", USER_ROLE) + self._add_user("test2", "test2@test.com", "test2", "/tmp/", USER_ROLE, csrf_token='1234567890##invalid') self.assertStatus(403) def test_delete_user(self): """ Check if deleting user is forbidden. """ - self._delete_user("test") + self._delete_user("test", csrf_token='1234567890##invalid') self.assertStatus(403) def test_edit_user(self): """ Check if editing user is forbidden. """ - self._edit_user("test", "test1@test.com", "test", "/var/invalid/", USER_ROLE) + self._edit_user("test", "test1@test.com", "test", "/var/invalid/", USER_ROLE, csrf_token='1234567890##invalid') self.assertStatus(403) def test_users(self): diff --git a/rdiffweb/controller/tests/test_page_delete.py b/rdiffweb/controller/tests/test_page_delete.py index 1b9c3c00..5ca9b644 100644 --- a/rdiffweb/controller/tests/test_page_delete.py +++ b/rdiffweb/controller/tests/test_page_delete.py @@ -37,12 +37,13 @@ class DeleteRepoTest(rdiffweb.test.WebCase): login = True - def _delete(self, user, repo, confirm, **kwargs): + def _delete(self, user, repo, confirm, redirect=None, csrf_token=None): body = {} if confirm is not None: body.update({'confirm': confirm}) - if kwargs: - body.update(kwargs) + if redirect is not None: + body['redirect'] = redirect + body['csrf_token'] = csrf_token or self.getCsrfToken('/prefs/sshkeys') self.getPage("/delete/" + user + "/" + repo + "/", method="POST", body=body) @skipIf(RDIFF_BACKUP_VERSION < (2, 0, 1), "rdiff-backup-delete is available since 2.0.1") @@ -61,6 +62,18 @@ def test_delete_path(self): with self.assertRaises(DoesNotExistError): self.app.store.get_repo_path('admin/testcases/Revisions', as_user=admin) + def test_delete_with_csrf_token(self): + # Given an authenticated user with an existing repositor + admin = self.app.store.get_user('admin') + self.app.store.get_repo_path('admin/testcases/Revisions', as_user=admin) + # When trying to delete a repository without a valide csrf token + self._delete(self.USERNAME, 'testcases/Revisions', 'Revisions', csrf_token='1234567890##invalid') + # Then the form is refused + self.assertStatus(400) + self.assertInBody('CSRF failed') + # Then repository is not deleted. + self.assertIsNotNone(self.app.store.get_repo_path('admin/testcases/Revisions', as_user=admin)) + @skipIf(RDIFF_BACKUP_VERSION < (2, 0, 1), "rdiff-backup-delete is available since 2.0.1") def test_delete_path_with_wrong_confirm(self): # Check directory exists @@ -118,8 +131,9 @@ def test_delete_repo_without_confirm(self): self.assertEqual(['broker-repo', 'testcases'], [r.name for r in self.app.store.get_user('admin').repo_objs]) # Delete repo without confirmation. self._delete(self.USERNAME, self.REPO, None) - # TODO Make sure the repository is not delete + # Make sure the repository is not delete self.assertStatus(400) + self.assertInBody('confirm: This field is required') self.assertEqual(['broker-repo', 'testcases'], [r.name for r in self.app.store.get_user('admin').repo_objs]) def test_delete_repo_as_admin(self): diff --git a/rdiffweb/controller/tests/test_page_prefs_ssh.py b/rdiffweb/controller/tests/test_page_prefs_ssh.py index 325adea2..6206823f 100644 --- a/rdiffweb/controller/tests/test_page_prefs_ssh.py +++ b/rdiffweb/controller/tests/test_page_prefs_ssh.py @@ -21,31 +21,30 @@ """ -import logging -import unittest - import rdiffweb.test +PREFS_SSHKEYS = "/prefs/sshkeys/" -class SSHKeysTest(rdiffweb.test.WebCase): - PREFS_SSHKEYS = "/prefs/sshkeys/" +class SSHKeysTest(rdiffweb.test.WebCase): login = True - def _delete_ssh_key(self, fingerprint): + def _delete_ssh_key(self, fingerprint, csrf_token=None): b = {'action': 'delete', 'fingerprint': fingerprint} - self.getPage(self.PREFS_SSHKEYS, method='POST', body=b) + b['csrf_token'] = csrf_token or self.getCsrfToken(PREFS_SSHKEYS) + self.getPage(PREFS_SSHKEYS, method='POST', body=b) - def _add_ssh_key(self, title, key): + def _add_ssh_key(self, title, key, csrf_token=None): b = {'action': 'add', 'title': title, 'key': key} - self.getPage(self.PREFS_SSHKEYS, method='POST', body=b) + b['csrf_token'] = csrf_token or self.getCsrfToken(PREFS_SSHKEYS) + self.getPage(PREFS_SSHKEYS, method='POST', body=b) def test_page(self): - self.getPage(self.PREFS_SSHKEYS) + self.getPage(PREFS_SSHKEYS) self.assertStatus('200 OK') def test_add(self): @@ -61,10 +60,25 @@ def test_add(self): self.assertEqual(1, len(list(user.authorizedkeys))) # Show page - self.getPage(self.PREFS_SSHKEYS) + self.getPage(PREFS_SSHKEYS) self.assertInBody("test@mysshkey") self.assertInBody("4d:42:8b:35:e5:55:71:f7:b3:0d:58:f9:b1:2c:9e:91") + def test_add_without_csrf_token(self): + # Given an authenticated user. + # When trying to add a key without valid csrf token. + self._add_ssh_key( + "test@mysshkey", + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDSEN5VTn9MLituZvdYTZMbZEaMxe0UuU7BelxHkvxzSpVWtazrIBEc3KZjtVoK9F3+0kd26P4DzSQuPUl3yZDgyZZeXrF6p2GlEA7A3tPuOEsAQ9c0oTiDYktq5/Go8vD+XAZKLd//qmCWW1Jg4datkWchMKJzbHUgBrBH015FDbGvGDWYTfVyb8I9H+LQ0GmbTHsuTu63DhPODncMtWPuS9be/flb4EEojMIx5Vce0SNO9Eih38W7jTvNWxZb75k5yfPJxBULRnS5v/fPnDVVtD3JSGybSwKoMdsMX5iImAeNhqnvd8gBu1f0IycUQexTbJXk1rPiRcF13SjKrfXz ikus060@ikus060-t530", + csrf_token='1234567890##invalid', + ) + # Then form is refused. + self.assertStatus(200) + self.assertInBody('CSRF failed') + # Then ssh key is not added. + self.getPage(PREFS_SSHKEYS) + self.assertNotInBody("4d:42:8b:35:e5:55:71:f7:b3:0d:58:f9:b1:2c:9e:91") + def test_add_duplicate(self): # Delete existing keys user = self.app.store.get_user('admin') @@ -112,6 +126,21 @@ def test_delete(self): self.assertStatus('200 OK') self.assertEqual(0, len(list(user.authorizedkeys))) + def test_delete_without_csrf_token(self): + # Given an authenticated user with an existing SSH Key + user = self.app.store.get_user('admin') + self._add_ssh_key("test@mysshkey", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDSEN5VTn9MLituZvdYTZMbZEaMxe0UuU7BelxHkvxzSpVWtazrIBEc3KZjtVoK9F3+0kd26P4DzSQuPUl3yZDgyZZeXrF6p2GlEA7A3tPuOEsAQ9c0oTiDYktq5/Go8vD+XAZKLd//qmCWW1Jg4datkWchMKJzbHUgBrBH015FDbGvGDWYTfVyb8I9H+LQ0GmbTHsuTu63DhPODncMtWPuS9be/flb4EEojMIx5Vce0SNO9Eih38W7jTvNWxZb75k5yfPJxBULRnS5v/fPnDVVtD3JSGybSwKoMdsMX5iImAeNhqnvd8gBu1f0IycUQexTbJXk1rPiRcF13SjKrfXz ikus060@ikus060-t530") + self.assertStatus('200 OK') + self.assertEqual(1, len(list(user.authorizedkeys))) + # When trying to delete a key without valid csrf token. + self._delete_ssh_key("4d:42:8b:35:e5:55:71:f7:b3:0d:58:f9:b1:2c:9e:91", csrf_token='1234567890##invalid') + # Then form is refused. + self.assertStatus(200) + self.assertInBody('CSRF failed') + # Then ssh key is not delete. + self.getPage(PREFS_SSHKEYS) + self.assertInBody("4d:42:8b:35:e5:55:71:f7:b3:0d:58:f9:b1:2c:9e:91") + def test_delete_invalid(self): # Delete existing keys user = self.app.store.get_user('admin') diff --git a/rdiffweb/templates/admin_users.html b/rdiffweb/templates/admin_users.html index 20419ed8..52112aec 100644 --- a/rdiffweb/templates/admin_users.html +++ b/rdiffweb/templates/admin_users.html @@ -55,7 +55,7 @@ data-toggle="modal" data-target="#edit-user-{{ user.username }}-modal"> {% trans %}Edit{% endtrans %} - {{ button_confirm(label=_('Delete'), target="#delete-user-modal", action="delete", username=user.username) }} + {{ button_confirm(label=_('Delete'), target="#delete-user-modal", action="delete", username=user.username, csrf_token=form.csrf_token.current_token) }} {% endfor %} @@ -69,6 +69,7 @@ {% macro user_form(form, id=None, user=None) %} +{{ form.csrf_token }} @@ -151,7 +152,7 @@ id='delete-user-modal', title=_('Delete User'), message=_("Are you sure you want to delete this User?"), - fields=['action', 'username'], + fields=['action', 'username', 'csrf_token'], submit=_('Delete')) }} {% endblock %} diff --git a/rdiffweb/templates/prefs_sshkeys.html b/rdiffweb/templates/prefs_sshkeys.html index 4fa44d3e..81482ee7 100644 --- a/rdiffweb/templates/prefs_sshkeys.html +++ b/rdiffweb/templates/prefs_sshkeys.html @@ -17,7 +17,7 @@ along with this program. If not, see . #} -
+
{% include 'message.html' %} @@ -58,7 +58,7 @@
  • - {{ button_confirm(label=_('Delete'), target="#delete-sshkey-modal", action="delete", fingerprint=key.fingerprint, disabled=not is_maintainer) }} + {{ button_confirm(label=_('Delete'), target="#delete-sshkey-modal", action="delete", fingerprint=key.fingerprint, csrf_token=form.csrf_token.current_token, disabled=not is_maintainer) }}
    {{ key.title }} @@ -74,6 +74,7 @@ {# Dialog to create SSH key. #} {% call modal_dialog('add-sshkey-modal',_('Add SSH key'), _('Add SSH key')) %} + {{ form.csrf_token }}
    {{ form.title.label(class="col-sm-2 control-label") }}
    @@ -99,5 +100,5 @@ id='delete-sshkey-modal', title=_('Delete SSH key'), message=_("Are you sure you want to delete this SSH Key?"), - fields=['action', 'fingerprint'], - submit=_('Delete')) }} + fields=['action', 'fingerprint', 'csrf_token'], + submit=_('Delete')) }} \ No newline at end of file diff --git a/rdiffweb/test.py b/rdiffweb/test.py index e0cd1941..40ab0730 100644 --- a/rdiffweb/test.py +++ b/rdiffweb/test.py @@ -21,12 +21,11 @@ @author: Patrik Dufresne """ - import json import os +import re import shutil import subprocess -import sys import tempfile import unittest import unittest.mock @@ -44,6 +43,7 @@ except: from urllib import urlencode # @UnresolvedImport @UnusedImport @Reimport + class MockRdiffwebApp(RdiffwebApp): def __init__(self, default_config={}): @@ -227,3 +227,13 @@ def getJson(self, *args, **kwargs): def _login(self, username=USERNAME, password=PASSWORD): self.getPage("/login/", method='POST', body={'login': username, 'password': password}) self.assertStatus('303 See Other') + + def getCsrfToken(self, url): + # Query page with CSRF token + self.getPage(url) + self.assertStatus(200) + self.assertInBody('csrf_token') + # Extract value. + m = re.search(']* value="(.[^"]*)">', self.body.decode('utf-8')) + self.assertTrue(m, 'expect csrf_token in body') + return m.group(1)